From 5c52f5149e6ceef6e6a3d1b563eadefd1470eb59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Prante?= Date: Tue, 28 Mar 2023 17:51:03 +0200 Subject: [PATCH] ensure that all requests are closed and released after requests have been dispatched, this should prevent netty to report bytebuf leaks --- .../http/server/netty/HttpRequestBuilder.java | 13 +++ .../server/netty/HttpResponseBuilder.java | 13 ++- .../http/server/netty/NettyHttpServer.java | 8 +- .../http/server/nio/HttpRequestBuilder.java | 5 + .../http/server/nio/HttpResponseBuilder.java | 4 + .../server/simple/HttpRequestBuilder.java | 6 ++ .../server/simple/HttpResponseBuilder.java | 5 + .../net/http/server/ApplicationBuilder.java | 7 +- .../xbib/net/http/server/BaseApplication.java | 96 +++++++++++++++---- .../http/server/BaseApplicationBuilder.java | 27 +++++- .../net/http/server/HttpRequestBuilder.java | 4 +- .../net/http/server/HttpResponseBuilder.java | 3 +- .../util/BlockingThreadPoolExecutor.java | 12 ++- 13 files changed, 160 insertions(+), 43 deletions(-) diff --git a/net-http-server-netty/src/main/java/org/xbib/net/http/server/netty/HttpRequestBuilder.java b/net-http-server-netty/src/main/java/org/xbib/net/http/server/netty/HttpRequestBuilder.java index 5e49e3e..24f1449 100644 --- a/net-http-server-netty/src/main/java/org/xbib/net/http/server/netty/HttpRequestBuilder.java +++ b/net-http-server-netty/src/main/java/org/xbib/net/http/server/netty/HttpRequestBuilder.java @@ -9,13 +9,18 @@ import org.xbib.net.http.HttpMethod; import org.xbib.net.http.HttpVersion; import org.xbib.net.http.server.BaseHttpRequestBuilder; +import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.charset.Charset; +import java.util.logging.Level; +import java.util.logging.Logger; public class HttpRequestBuilder extends BaseHttpRequestBuilder { + private static final Logger logger = Logger.getLogger(HttpRequestBuilder.class.getName()); + protected FullHttpRequest fullHttpRequest; protected ByteBuffer byteBuffer; @@ -109,4 +114,12 @@ public class HttpRequestBuilder extends BaseHttpRequestBuilder { public HttpRequest build() { return new HttpRequest(this); } + + @Override + public void close() throws IOException { + if (fullHttpRequest != null) { + logger.log(Level.FINER, "releasing retained netty request"); + fullHttpRequest.release(); + } + } } diff --git a/net-http-server-netty/src/main/java/org/xbib/net/http/server/netty/HttpResponseBuilder.java b/net-http-server-netty/src/main/java/org/xbib/net/http/server/netty/HttpResponseBuilder.java index d6e92c2..b6f8ec2 100644 --- a/net-http-server-netty/src/main/java/org/xbib/net/http/server/netty/HttpResponseBuilder.java +++ b/net-http-server-netty/src/main/java/org/xbib/net/http/server/netty/HttpResponseBuilder.java @@ -128,9 +128,12 @@ public class HttpResponseBuilder extends BaseHttpResponseBuilder { internalBufferWrite(Unpooled.buffer(0)); } - public void close() { - logger.log(Level.FINER, "closing channel " + ctx.channel()); - ctx.close(); + @Override + public void close() throws IOException { + if (ctx.channel().isOpen()) { + logger.log(Level.FINER, "closing netty channel " + ctx.channel()); + ctx.close(); + } } @Override @@ -147,10 +150,6 @@ public class HttpResponseBuilder extends BaseHttpResponseBuilder { } else if (inputStream != null) { internalWrite(inputStream, bufferSize, true); } - if (shouldFlush()) { - // really server flush? - //flush(); - } return new HttpResponse(this); } diff --git a/net-http-server-netty/src/main/java/org/xbib/net/http/server/netty/NettyHttpServer.java b/net-http-server-netty/src/main/java/org/xbib/net/http/server/netty/NettyHttpServer.java index 85aa8f2..3294017 100644 --- a/net-http-server-netty/src/main/java/org/xbib/net/http/server/netty/NettyHttpServer.java +++ b/net-http-server-netty/src/main/java/org/xbib/net/http/server/netty/NettyHttpServer.java @@ -135,11 +135,11 @@ public class NettyHttpServer implements HttpServer { channelFuture.channel().closeFuture() .addListener((ChannelFutureListener) future -> { future.await(); - logger.log(Level.INFO, "future " + future + " awaited"); + logger.log(Level.FINER, "future " + future + " awaited"); }); channels.add(channelFuture.sync().channel()); channelFuture.await(); - logger.log(Level.INFO, () -> channelFuture.channel() + " ready, listening"); + logger.log(Level.FINER, () -> channelFuture.channel() + " ready, listening"); } catch (InterruptedException e) { logger.log(Level.WARNING, e.getMessage(), e); } @@ -152,7 +152,7 @@ public class NettyHttpServer implements HttpServer { try { ChannelFuture channelFuture = channel.closeFuture().sync(); if (channelFuture.isDone()) { - logger.log(Level.INFO, () -> channel + " close future synced"); + logger.log(Level.FINER, () -> channel + " close future synced"); } } catch (InterruptedException e) { logger.log(Level.WARNING, e.getMessage(), e); @@ -192,7 +192,7 @@ public class NettyHttpServer implements HttpServer { for (ChannelFuture channelFuture : channelFutures) { if (channelFuture != null && !channelFuture.isDone()) { if (channelFuture.channel().isOpen()) { - logger.log(Level.INFO, "closing channel future"); + logger.log(Level.FINER, "closing channel future"); channelFuture.channel().close(); } channelFuture.cancel(true); diff --git a/net-http-server-nio/src/main/java/org/xbib/net/http/server/nio/HttpRequestBuilder.java b/net-http-server-nio/src/main/java/org/xbib/net/http/server/nio/HttpRequestBuilder.java index 88aa3b2..1d1188e 100644 --- a/net-http-server-nio/src/main/java/org/xbib/net/http/server/nio/HttpRequestBuilder.java +++ b/net-http-server-nio/src/main/java/org/xbib/net/http/server/nio/HttpRequestBuilder.java @@ -7,6 +7,7 @@ import org.xbib.net.http.HttpMethod; import org.xbib.net.http.HttpVersion; import org.xbib.net.http.server.BaseHttpRequestBuilder; +import java.io.IOException; import java.io.InputStream; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -96,4 +97,8 @@ public class HttpRequestBuilder extends BaseHttpRequestBuilder { public InputStream getInputStream() { return null; } + + @Override + public void close() throws IOException { + } } diff --git a/net-http-server-nio/src/main/java/org/xbib/net/http/server/nio/HttpResponseBuilder.java b/net-http-server-nio/src/main/java/org/xbib/net/http/server/nio/HttpResponseBuilder.java index c531472..8a5bcf4 100644 --- a/net-http-server-nio/src/main/java/org/xbib/net/http/server/nio/HttpResponseBuilder.java +++ b/net-http-server-nio/src/main/java/org/xbib/net/http/server/nio/HttpResponseBuilder.java @@ -114,4 +114,8 @@ public class HttpResponseBuilder extends BaseHttpResponseBuilder { channel.write(fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, contentLength)); } } + + @Override + public void close() throws IOException { + } } diff --git a/net-http-server-simple/src/main/java/org/xbib/net/http/server/simple/HttpRequestBuilder.java b/net-http-server-simple/src/main/java/org/xbib/net/http/server/simple/HttpRequestBuilder.java index 2ab50d0..716b32a 100644 --- a/net-http-server-simple/src/main/java/org/xbib/net/http/server/simple/HttpRequestBuilder.java +++ b/net-http-server-simple/src/main/java/org/xbib/net/http/server/simple/HttpRequestBuilder.java @@ -7,6 +7,7 @@ import org.xbib.net.http.HttpMethod; import org.xbib.net.http.HttpVersion; import org.xbib.net.http.server.BaseHttpRequestBuilder; +import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -97,4 +98,9 @@ public class HttpRequestBuilder extends BaseHttpRequestBuilder { public HttpRequest build() { return new HttpRequest(this); } + + @Override + public void close() throws IOException { + + } } diff --git a/net-http-server-simple/src/main/java/org/xbib/net/http/server/simple/HttpResponseBuilder.java b/net-http-server-simple/src/main/java/org/xbib/net/http/server/simple/HttpResponseBuilder.java index 580bfe8..a3d7ea6 100644 --- a/net-http-server-simple/src/main/java/org/xbib/net/http/server/simple/HttpResponseBuilder.java +++ b/net-http-server-simple/src/main/java/org/xbib/net/http/server/simple/HttpResponseBuilder.java @@ -118,4 +118,9 @@ public class HttpResponseBuilder extends BaseHttpResponseBuilder { channel.write(fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, contentLength)); } } + + @Override + public void close() throws IOException { + + } } diff --git a/net-http-server/src/main/java/org/xbib/net/http/server/ApplicationBuilder.java b/net-http-server/src/main/java/org/xbib/net/http/server/ApplicationBuilder.java index 91c13cf..2b75a2d 100644 --- a/net-http-server/src/main/java/org/xbib/net/http/server/ApplicationBuilder.java +++ b/net-http-server/src/main/java/org/xbib/net/http/server/ApplicationBuilder.java @@ -5,12 +5,17 @@ import org.xbib.net.http.server.route.HttpRouter; import java.nio.file.Path; import java.time.ZoneId; import java.util.Locale; +import java.util.concurrent.TimeUnit; public interface ApplicationBuilder { ApplicationBuilder setThreadCount(int blockingThreadCount); - ApplicationBuilder setQueueCount(int blockingQueueCount); + ApplicationBuilder setQueueCount(int blockingThreadQueueCount); + + ApplicationBuilder setKeepAliveTime(int keepAliveTime); + + ApplicationBuilder setKeepAliveTimeUnit(TimeUnit keepAliveTimeUnit); ApplicationBuilder setHome(Path home); diff --git a/net-http-server/src/main/java/org/xbib/net/http/server/BaseApplication.java b/net-http-server/src/main/java/org/xbib/net/http/server/BaseApplication.java index 90267d8..48e4cfe 100644 --- a/net-http-server/src/main/java/org/xbib/net/http/server/BaseApplication.java +++ b/net-http-server/src/main/java/org/xbib/net/http/server/BaseApplication.java @@ -59,7 +59,8 @@ public class BaseApplication implements Application { protected BaseApplication(BaseApplicationBuilder builder) { this.builder = builder; - this.executor = new BlockingThreadPoolExecutor(builder.blockingThreadCount, builder.blockingQueueCount, + this.executor = new BlockingThreadPoolExecutor(builder.blockingThreadCount, builder.blockingThreadQueueCount, + builder.blockingThreadKeepAliveTime, builder.blockingThreadKeepAliveTimeUnit, new NamedThreadFactory("org-xbib-net-http-server-application")); this.executor.setRejectedExecutionHandler((runnable, threadPoolExecutor) -> logger.log(Level.SEVERE, "rejected " + runnable + " for thread pool executor = " + threadPoolExecutor)); @@ -125,15 +126,10 @@ public class BaseApplication implements Application { } @Override - public void dispatch(HttpRequestBuilder requestBuilder, HttpResponseBuilder responseBuilder) { - Future future = executor.submit(() -> { - try { - getRouter().route(requestBuilder, responseBuilder); - } catch (Throwable t) { - logger.log(Level.SEVERE, t.getMessage(), t); - throw t; - } - }); + public void dispatch(HttpRequestBuilder httpRequestBuilder, + HttpResponseBuilder httpResponseBuilder) { + Submittable submittable = new Submittable(httpRequestBuilder, httpResponseBuilder); + Future future = executor.submit(submittable); logger.log(Level.FINE, "dispatching " + future); } @@ -141,16 +137,11 @@ public class BaseApplication implements Application { public void dispatch(HttpRequestBuilder httpRequestBuilder, HttpResponseBuilder httpResponseBuilder, HttpResponseStatus httpResponseStatus) { - Future future = executor.submit(() -> { - HttpServerContext httpServerContext = createContext(null, httpRequestBuilder, httpResponseBuilder); - httpServerContext.getAttributes().put("responsebuilder", httpResponseBuilder); - try { - getRouter().routeStatus(httpResponseStatus, httpServerContext); - } catch (Throwable t) { - logger.log(Level.SEVERE, t.getMessage(), t); - throw t; - } - }); + HttpServerContext httpServerContext = createContext(null, httpRequestBuilder, httpResponseBuilder); + httpServerContext.getAttributes().put("responsebuilder", httpResponseBuilder); + StatusSubmittable submittable = new StatusSubmittable(httpRequestBuilder, httpResponseBuilder, + httpResponseStatus, httpServerContext); + Future future = executor.submit(submittable); logger.log(Level.FINE, "dispatching status " + future); } @@ -339,4 +330,69 @@ public class BaseApplication implements Application { } logger.log(Level.INFO, "application closed"); } + + private class Submittable implements Runnable, Closeable { + + private final HttpRequestBuilder httpRequestBuilder; + + private final HttpResponseBuilder httpResponseBuilder; + + private Submittable(HttpRequestBuilder httpRequestBuilder, + HttpResponseBuilder httpResponseBuilder) { + this.httpRequestBuilder = httpRequestBuilder; + this.httpResponseBuilder = httpResponseBuilder; + } + + @Override + public void run() { + try { + getRouter().route(httpRequestBuilder, httpResponseBuilder); + } catch (Throwable t) { + logger.log(Level.SEVERE, t.getMessage(), t); + throw t; + } + } + + @Override + public void close() throws IOException { + httpRequestBuilder.close(); + httpResponseBuilder.close(); + } + } + + private class StatusSubmittable implements Runnable, Closeable { + private final HttpRequestBuilder httpRequestBuilder; + + private final HttpResponseBuilder httpResponseBuilder; + + private final HttpResponseStatus httpResponseStatus; + + private final HttpServerContext httpServerContext; + + private StatusSubmittable(HttpRequestBuilder httpRequestBuilder, + HttpResponseBuilder httpResponseBuilder, + HttpResponseStatus httpResponseStatus, + HttpServerContext httpServerContext) { + this.httpRequestBuilder = httpRequestBuilder; + this.httpResponseBuilder = httpResponseBuilder; + this.httpResponseStatus = httpResponseStatus; + this.httpServerContext = httpServerContext; + } + + @Override + public void run() { + try { + getRouter().routeStatus(httpResponseStatus, httpServerContext); + } catch (Throwable t) { + logger.log(Level.SEVERE, t.getMessage(), t); + throw t; + } + } + + @Override + public void close() throws IOException { + httpRequestBuilder.close(); + httpResponseBuilder.close(); + } + } } diff --git a/net-http-server/src/main/java/org/xbib/net/http/server/BaseApplicationBuilder.java b/net-http-server/src/main/java/org/xbib/net/http/server/BaseApplicationBuilder.java index 7427d22..d88782d 100644 --- a/net-http-server/src/main/java/org/xbib/net/http/server/BaseApplicationBuilder.java +++ b/net-http-server/src/main/java/org/xbib/net/http/server/BaseApplicationBuilder.java @@ -17,6 +17,7 @@ import java.util.Map; import java.util.Optional; import java.util.ServiceLoader; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; @@ -37,7 +38,11 @@ public class BaseApplicationBuilder implements ApplicationBuilder { protected int blockingThreadCount; - protected int blockingQueueCount; + protected int blockingThreadQueueCount; + + protected int blockingThreadKeepAliveTime; + + protected TimeUnit blockingThreadKeepAliveTimeUnit; protected Path home; @@ -66,7 +71,9 @@ public class BaseApplicationBuilder implements ApplicationBuilder { protected BaseApplicationBuilder() { this.classLoader = getClass().getClassLoader(); this.blockingThreadCount = Runtime.getRuntime().availableProcessors(); - this.blockingQueueCount = Integer.MAX_VALUE; + this.blockingThreadQueueCount = Integer.MAX_VALUE; + this.blockingThreadKeepAliveTime = 60; + this.blockingThreadKeepAliveTimeUnit = TimeUnit.SECONDS; this.home = Paths.get(System.getProperties().containsKey("application.home") ? System.getProperty("application.home") : "."); this.contextPath = "/"; this.secret = "secret"; @@ -88,8 +95,20 @@ public class BaseApplicationBuilder implements ApplicationBuilder { } @Override - public BaseApplicationBuilder setQueueCount(int blockingQueueCount) { - this.blockingQueueCount = blockingQueueCount; + public BaseApplicationBuilder setQueueCount(int blockingThreadQueueCount) { + this.blockingThreadQueueCount = blockingThreadQueueCount; + return this; + } + + @Override + public BaseApplicationBuilder setKeepAliveTime(int blockingThreadKeepAliveTime) { + this.blockingThreadKeepAliveTime = blockingThreadKeepAliveTime; + return this; + } + + @Override + public BaseApplicationBuilder setKeepAliveTimeUnit(TimeUnit blockingThreadKeepAliveTimeUnit) { + this.blockingThreadKeepAliveTimeUnit = blockingThreadKeepAliveTimeUnit; return this; } diff --git a/net-http-server/src/main/java/org/xbib/net/http/server/HttpRequestBuilder.java b/net-http-server/src/main/java/org/xbib/net/http/server/HttpRequestBuilder.java index b0df0d7..6e230a7 100644 --- a/net-http-server/src/main/java/org/xbib/net/http/server/HttpRequestBuilder.java +++ b/net-http-server/src/main/java/org/xbib/net/http/server/HttpRequestBuilder.java @@ -7,10 +7,11 @@ import org.xbib.net.http.HttpHeaders; import org.xbib.net.http.HttpMethod; import org.xbib.net.http.HttpVersion; +import java.io.Closeable; import java.nio.CharBuffer; import java.nio.charset.Charset; -public interface HttpRequestBuilder { +public interface HttpRequestBuilder extends Closeable { HttpRequestBuilder setAddress(HttpAddress httpAddress); @@ -47,4 +48,5 @@ public interface HttpRequestBuilder { HttpRequest build(); void done(); + } diff --git a/net-http-server/src/main/java/org/xbib/net/http/server/HttpResponseBuilder.java b/net-http-server/src/main/java/org/xbib/net/http/server/HttpResponseBuilder.java index dc93162..541c48d 100644 --- a/net-http-server/src/main/java/org/xbib/net/http/server/HttpResponseBuilder.java +++ b/net-http-server/src/main/java/org/xbib/net/http/server/HttpResponseBuilder.java @@ -8,12 +8,13 @@ import org.xbib.net.http.HttpResponseStatus; import org.xbib.net.http.HttpVersion; import org.xbib.net.http.cookie.Cookie; +import java.io.Closeable; import java.io.InputStream; import java.nio.CharBuffer; import java.nio.channels.FileChannel; import java.nio.charset.Charset; -public interface HttpResponseBuilder { +public interface HttpResponseBuilder extends Closeable { HttpResponseBuilder setDataBufferFactory(DataBufferFactory dataBufferFactory); diff --git a/net-http-server/src/main/java/org/xbib/net/http/server/util/BlockingThreadPoolExecutor.java b/net-http-server/src/main/java/org/xbib/net/http/server/util/BlockingThreadPoolExecutor.java index 62d0479..549210e 100644 --- a/net-http-server/src/main/java/org/xbib/net/http/server/util/BlockingThreadPoolExecutor.java +++ b/net-http-server/src/main/java/org/xbib/net/http/server/util/BlockingThreadPoolExecutor.java @@ -1,5 +1,7 @@ package org.xbib.net.http.server.util; +import java.io.Closeable; +import java.io.IOException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CancellationException; @@ -17,11 +19,6 @@ public class BlockingThreadPoolExecutor extends ThreadPoolExecutor { private final Logger logger = Logger.getLogger(BlockingThreadPoolExecutor.class.getName()); - public BlockingThreadPoolExecutor(int nThreads, int maxQueue, - ThreadFactory threadFactory) { - this(nThreads, maxQueue, 60L, TimeUnit.SECONDS, threadFactory); - } - public BlockingThreadPoolExecutor(int nThreads, int maxQueue, long keepAliveTime, TimeUnit timeUnit, ThreadFactory threadFactory) { @@ -52,6 +49,9 @@ public class BlockingThreadPoolExecutor extends ThreadPoolExecutor { logger.log(Level.FINE, "waiting for " + future); future.get(); } + if (future instanceof Closeable closeable) { + closeable.close(); + } } catch (CancellationException ce) { logger.log(Level.FINE, ce.getMessage(), ce); throwable = ce; @@ -61,6 +61,8 @@ public class BlockingThreadPoolExecutor extends ThreadPoolExecutor { } catch (InterruptedException ie) { Thread.currentThread().interrupt(); logger.log(Level.FINE, ie.getMessage(), ie); + } catch (IOException e) { + logger.log(Level.FINE, e.getMessage(), e); } } if (throwable != null) {