ensure that all requests are closed and released after requests have been dispatched, this should prevent netty to report bytebuf leaks

This commit is contained in:
Jörg Prante 2023-03-28 17:51:03 +02:00
parent 08e965d588
commit 5c52f5149e
13 changed files with 160 additions and 43 deletions

View file

@ -9,13 +9,18 @@ import org.xbib.net.http.HttpMethod;
import org.xbib.net.http.HttpVersion; import org.xbib.net.http.HttpVersion;
import org.xbib.net.http.server.BaseHttpRequestBuilder; import org.xbib.net.http.server.BaseHttpRequestBuilder;
import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.CharBuffer; import java.nio.CharBuffer;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.logging.Level;
import java.util.logging.Logger;
public class HttpRequestBuilder extends BaseHttpRequestBuilder { public class HttpRequestBuilder extends BaseHttpRequestBuilder {
private static final Logger logger = Logger.getLogger(HttpRequestBuilder.class.getName());
protected FullHttpRequest fullHttpRequest; protected FullHttpRequest fullHttpRequest;
protected ByteBuffer byteBuffer; protected ByteBuffer byteBuffer;
@ -109,4 +114,12 @@ public class HttpRequestBuilder extends BaseHttpRequestBuilder {
public HttpRequest build() { public HttpRequest build() {
return new HttpRequest(this); return new HttpRequest(this);
} }
@Override
public void close() throws IOException {
if (fullHttpRequest != null) {
logger.log(Level.FINER, "releasing retained netty request");
fullHttpRequest.release();
}
}
} }

View file

@ -128,9 +128,12 @@ public class HttpResponseBuilder extends BaseHttpResponseBuilder {
internalBufferWrite(Unpooled.buffer(0)); internalBufferWrite(Unpooled.buffer(0));
} }
public void close() { @Override
logger.log(Level.FINER, "closing channel " + ctx.channel()); public void close() throws IOException {
ctx.close(); if (ctx.channel().isOpen()) {
logger.log(Level.FINER, "closing netty channel " + ctx.channel());
ctx.close();
}
} }
@Override @Override
@ -147,10 +150,6 @@ public class HttpResponseBuilder extends BaseHttpResponseBuilder {
} else if (inputStream != null) { } else if (inputStream != null) {
internalWrite(inputStream, bufferSize, true); internalWrite(inputStream, bufferSize, true);
} }
if (shouldFlush()) {
// really server flush?
//flush();
}
return new HttpResponse(this); return new HttpResponse(this);
} }

View file

@ -135,11 +135,11 @@ public class NettyHttpServer implements HttpServer {
channelFuture.channel().closeFuture() channelFuture.channel().closeFuture()
.addListener((ChannelFutureListener) future -> { .addListener((ChannelFutureListener) future -> {
future.await(); future.await();
logger.log(Level.INFO, "future " + future + " awaited"); logger.log(Level.FINER, "future " + future + " awaited");
}); });
channels.add(channelFuture.sync().channel()); channels.add(channelFuture.sync().channel());
channelFuture.await(); channelFuture.await();
logger.log(Level.INFO, () -> channelFuture.channel() + " ready, listening"); logger.log(Level.FINER, () -> channelFuture.channel() + " ready, listening");
} catch (InterruptedException e) { } catch (InterruptedException e) {
logger.log(Level.WARNING, e.getMessage(), e); logger.log(Level.WARNING, e.getMessage(), e);
} }
@ -152,7 +152,7 @@ public class NettyHttpServer implements HttpServer {
try { try {
ChannelFuture channelFuture = channel.closeFuture().sync(); ChannelFuture channelFuture = channel.closeFuture().sync();
if (channelFuture.isDone()) { if (channelFuture.isDone()) {
logger.log(Level.INFO, () -> channel + " close future synced"); logger.log(Level.FINER, () -> channel + " close future synced");
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
logger.log(Level.WARNING, e.getMessage(), e); logger.log(Level.WARNING, e.getMessage(), e);
@ -192,7 +192,7 @@ public class NettyHttpServer implements HttpServer {
for (ChannelFuture channelFuture : channelFutures) { for (ChannelFuture channelFuture : channelFutures) {
if (channelFuture != null && !channelFuture.isDone()) { if (channelFuture != null && !channelFuture.isDone()) {
if (channelFuture.channel().isOpen()) { if (channelFuture.channel().isOpen()) {
logger.log(Level.INFO, "closing channel future"); logger.log(Level.FINER, "closing channel future");
channelFuture.channel().close(); channelFuture.channel().close();
} }
channelFuture.cancel(true); channelFuture.cancel(true);

View file

@ -7,6 +7,7 @@ import org.xbib.net.http.HttpMethod;
import org.xbib.net.http.HttpVersion; import org.xbib.net.http.HttpVersion;
import org.xbib.net.http.server.BaseHttpRequestBuilder; import org.xbib.net.http.server.BaseHttpRequestBuilder;
import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -96,4 +97,8 @@ public class HttpRequestBuilder extends BaseHttpRequestBuilder {
public InputStream getInputStream() { public InputStream getInputStream() {
return null; return null;
} }
@Override
public void close() throws IOException {
}
} }

View file

@ -114,4 +114,8 @@ public class HttpResponseBuilder extends BaseHttpResponseBuilder {
channel.write(fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, contentLength)); channel.write(fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, contentLength));
} }
} }
@Override
public void close() throws IOException {
}
} }

View file

@ -7,6 +7,7 @@ import org.xbib.net.http.HttpMethod;
import org.xbib.net.http.HttpVersion; import org.xbib.net.http.HttpVersion;
import org.xbib.net.http.server.BaseHttpRequestBuilder; import org.xbib.net.http.server.BaseHttpRequestBuilder;
import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -97,4 +98,9 @@ public class HttpRequestBuilder extends BaseHttpRequestBuilder {
public HttpRequest build() { public HttpRequest build() {
return new HttpRequest(this); return new HttpRequest(this);
} }
@Override
public void close() throws IOException {
}
} }

View file

@ -118,4 +118,9 @@ public class HttpResponseBuilder extends BaseHttpResponseBuilder {
channel.write(fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, contentLength)); channel.write(fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, contentLength));
} }
} }
@Override
public void close() throws IOException {
}
} }

View file

@ -5,12 +5,17 @@ import org.xbib.net.http.server.route.HttpRouter;
import java.nio.file.Path; import java.nio.file.Path;
import java.time.ZoneId; import java.time.ZoneId;
import java.util.Locale; import java.util.Locale;
import java.util.concurrent.TimeUnit;
public interface ApplicationBuilder { public interface ApplicationBuilder {
ApplicationBuilder setThreadCount(int blockingThreadCount); ApplicationBuilder setThreadCount(int blockingThreadCount);
ApplicationBuilder setQueueCount(int blockingQueueCount); ApplicationBuilder setQueueCount(int blockingThreadQueueCount);
ApplicationBuilder setKeepAliveTime(int keepAliveTime);
ApplicationBuilder setKeepAliveTimeUnit(TimeUnit keepAliveTimeUnit);
ApplicationBuilder setHome(Path home); ApplicationBuilder setHome(Path home);

View file

@ -59,7 +59,8 @@ public class BaseApplication implements Application {
protected BaseApplication(BaseApplicationBuilder builder) { protected BaseApplication(BaseApplicationBuilder builder) {
this.builder = builder; this.builder = builder;
this.executor = new BlockingThreadPoolExecutor(builder.blockingThreadCount, builder.blockingQueueCount, this.executor = new BlockingThreadPoolExecutor(builder.blockingThreadCount, builder.blockingThreadQueueCount,
builder.blockingThreadKeepAliveTime, builder.blockingThreadKeepAliveTimeUnit,
new NamedThreadFactory("org-xbib-net-http-server-application")); new NamedThreadFactory("org-xbib-net-http-server-application"));
this.executor.setRejectedExecutionHandler((runnable, threadPoolExecutor) -> this.executor.setRejectedExecutionHandler((runnable, threadPoolExecutor) ->
logger.log(Level.SEVERE, "rejected " + runnable + " for thread pool executor = " + threadPoolExecutor)); logger.log(Level.SEVERE, "rejected " + runnable + " for thread pool executor = " + threadPoolExecutor));
@ -125,15 +126,10 @@ public class BaseApplication implements Application {
} }
@Override @Override
public void dispatch(HttpRequestBuilder requestBuilder, HttpResponseBuilder responseBuilder) { public void dispatch(HttpRequestBuilder httpRequestBuilder,
Future<?> future = executor.submit(() -> { HttpResponseBuilder httpResponseBuilder) {
try { Submittable submittable = new Submittable(httpRequestBuilder, httpResponseBuilder);
getRouter().route(requestBuilder, responseBuilder); Future<?> future = executor.submit(submittable);
} catch (Throwable t) {
logger.log(Level.SEVERE, t.getMessage(), t);
throw t;
}
});
logger.log(Level.FINE, "dispatching " + future); logger.log(Level.FINE, "dispatching " + future);
} }
@ -141,16 +137,11 @@ public class BaseApplication implements Application {
public void dispatch(HttpRequestBuilder httpRequestBuilder, public void dispatch(HttpRequestBuilder httpRequestBuilder,
HttpResponseBuilder httpResponseBuilder, HttpResponseBuilder httpResponseBuilder,
HttpResponseStatus httpResponseStatus) { HttpResponseStatus httpResponseStatus) {
Future<?> future = executor.submit(() -> { HttpServerContext httpServerContext = createContext(null, httpRequestBuilder, httpResponseBuilder);
HttpServerContext httpServerContext = createContext(null, httpRequestBuilder, httpResponseBuilder); httpServerContext.getAttributes().put("responsebuilder", httpResponseBuilder);
httpServerContext.getAttributes().put("responsebuilder", httpResponseBuilder); StatusSubmittable submittable = new StatusSubmittable(httpRequestBuilder, httpResponseBuilder,
try { httpResponseStatus, httpServerContext);
getRouter().routeStatus(httpResponseStatus, httpServerContext); Future<?> future = executor.submit(submittable);
} catch (Throwable t) {
logger.log(Level.SEVERE, t.getMessage(), t);
throw t;
}
});
logger.log(Level.FINE, "dispatching status " + future); logger.log(Level.FINE, "dispatching status " + future);
} }
@ -339,4 +330,69 @@ public class BaseApplication implements Application {
} }
logger.log(Level.INFO, "application closed"); logger.log(Level.INFO, "application closed");
} }
private class Submittable implements Runnable, Closeable {
private final HttpRequestBuilder httpRequestBuilder;
private final HttpResponseBuilder httpResponseBuilder;
private Submittable(HttpRequestBuilder httpRequestBuilder,
HttpResponseBuilder httpResponseBuilder) {
this.httpRequestBuilder = httpRequestBuilder;
this.httpResponseBuilder = httpResponseBuilder;
}
@Override
public void run() {
try {
getRouter().route(httpRequestBuilder, httpResponseBuilder);
} catch (Throwable t) {
logger.log(Level.SEVERE, t.getMessage(), t);
throw t;
}
}
@Override
public void close() throws IOException {
httpRequestBuilder.close();
httpResponseBuilder.close();
}
}
private class StatusSubmittable implements Runnable, Closeable {
private final HttpRequestBuilder httpRequestBuilder;
private final HttpResponseBuilder httpResponseBuilder;
private final HttpResponseStatus httpResponseStatus;
private final HttpServerContext httpServerContext;
private StatusSubmittable(HttpRequestBuilder httpRequestBuilder,
HttpResponseBuilder httpResponseBuilder,
HttpResponseStatus httpResponseStatus,
HttpServerContext httpServerContext) {
this.httpRequestBuilder = httpRequestBuilder;
this.httpResponseBuilder = httpResponseBuilder;
this.httpResponseStatus = httpResponseStatus;
this.httpServerContext = httpServerContext;
}
@Override
public void run() {
try {
getRouter().routeStatus(httpResponseStatus, httpServerContext);
} catch (Throwable t) {
logger.log(Level.SEVERE, t.getMessage(), t);
throw t;
}
}
@Override
public void close() throws IOException {
httpRequestBuilder.close();
httpResponseBuilder.close();
}
}
} }

View file

@ -17,6 +17,7 @@ import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.ServiceLoader; import java.util.ServiceLoader;
import java.util.Set; import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -37,7 +38,11 @@ public class BaseApplicationBuilder implements ApplicationBuilder {
protected int blockingThreadCount; protected int blockingThreadCount;
protected int blockingQueueCount; protected int blockingThreadQueueCount;
protected int blockingThreadKeepAliveTime;
protected TimeUnit blockingThreadKeepAliveTimeUnit;
protected Path home; protected Path home;
@ -66,7 +71,9 @@ public class BaseApplicationBuilder implements ApplicationBuilder {
protected BaseApplicationBuilder() { protected BaseApplicationBuilder() {
this.classLoader = getClass().getClassLoader(); this.classLoader = getClass().getClassLoader();
this.blockingThreadCount = Runtime.getRuntime().availableProcessors(); this.blockingThreadCount = Runtime.getRuntime().availableProcessors();
this.blockingQueueCount = Integer.MAX_VALUE; this.blockingThreadQueueCount = Integer.MAX_VALUE;
this.blockingThreadKeepAliveTime = 60;
this.blockingThreadKeepAliveTimeUnit = TimeUnit.SECONDS;
this.home = Paths.get(System.getProperties().containsKey("application.home") ? System.getProperty("application.home") : "."); this.home = Paths.get(System.getProperties().containsKey("application.home") ? System.getProperty("application.home") : ".");
this.contextPath = "/"; this.contextPath = "/";
this.secret = "secret"; this.secret = "secret";
@ -88,8 +95,20 @@ public class BaseApplicationBuilder implements ApplicationBuilder {
} }
@Override @Override
public BaseApplicationBuilder setQueueCount(int blockingQueueCount) { public BaseApplicationBuilder setQueueCount(int blockingThreadQueueCount) {
this.blockingQueueCount = blockingQueueCount; this.blockingThreadQueueCount = blockingThreadQueueCount;
return this;
}
@Override
public BaseApplicationBuilder setKeepAliveTime(int blockingThreadKeepAliveTime) {
this.blockingThreadKeepAliveTime = blockingThreadKeepAliveTime;
return this;
}
@Override
public BaseApplicationBuilder setKeepAliveTimeUnit(TimeUnit blockingThreadKeepAliveTimeUnit) {
this.blockingThreadKeepAliveTimeUnit = blockingThreadKeepAliveTimeUnit;
return this; return this;
} }

View file

@ -7,10 +7,11 @@ import org.xbib.net.http.HttpHeaders;
import org.xbib.net.http.HttpMethod; import org.xbib.net.http.HttpMethod;
import org.xbib.net.http.HttpVersion; import org.xbib.net.http.HttpVersion;
import java.io.Closeable;
import java.nio.CharBuffer; import java.nio.CharBuffer;
import java.nio.charset.Charset; import java.nio.charset.Charset;
public interface HttpRequestBuilder { public interface HttpRequestBuilder extends Closeable {
HttpRequestBuilder setAddress(HttpAddress httpAddress); HttpRequestBuilder setAddress(HttpAddress httpAddress);
@ -47,4 +48,5 @@ public interface HttpRequestBuilder {
HttpRequest build(); HttpRequest build();
void done(); void done();
} }

View file

@ -8,12 +8,13 @@ import org.xbib.net.http.HttpResponseStatus;
import org.xbib.net.http.HttpVersion; import org.xbib.net.http.HttpVersion;
import org.xbib.net.http.cookie.Cookie; import org.xbib.net.http.cookie.Cookie;
import java.io.Closeable;
import java.io.InputStream; import java.io.InputStream;
import java.nio.CharBuffer; import java.nio.CharBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.nio.charset.Charset; import java.nio.charset.Charset;
public interface HttpResponseBuilder { public interface HttpResponseBuilder extends Closeable {
HttpResponseBuilder setDataBufferFactory(DataBufferFactory dataBufferFactory); HttpResponseBuilder setDataBufferFactory(DataBufferFactory dataBufferFactory);

View file

@ -1,5 +1,7 @@
package org.xbib.net.http.server.util; package org.xbib.net.http.server.util;
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException; import java.util.concurrent.CancellationException;
@ -17,11 +19,6 @@ public class BlockingThreadPoolExecutor extends ThreadPoolExecutor {
private final Logger logger = Logger.getLogger(BlockingThreadPoolExecutor.class.getName()); private final Logger logger = Logger.getLogger(BlockingThreadPoolExecutor.class.getName());
public BlockingThreadPoolExecutor(int nThreads, int maxQueue,
ThreadFactory threadFactory) {
this(nThreads, maxQueue, 60L, TimeUnit.SECONDS, threadFactory);
}
public BlockingThreadPoolExecutor(int nThreads, int maxQueue, public BlockingThreadPoolExecutor(int nThreads, int maxQueue,
long keepAliveTime, TimeUnit timeUnit, long keepAliveTime, TimeUnit timeUnit,
ThreadFactory threadFactory) { ThreadFactory threadFactory) {
@ -52,6 +49,9 @@ public class BlockingThreadPoolExecutor extends ThreadPoolExecutor {
logger.log(Level.FINE, "waiting for " + future); logger.log(Level.FINE, "waiting for " + future);
future.get(); future.get();
} }
if (future instanceof Closeable closeable) {
closeable.close();
}
} catch (CancellationException ce) { } catch (CancellationException ce) {
logger.log(Level.FINE, ce.getMessage(), ce); logger.log(Level.FINE, ce.getMessage(), ce);
throwable = ce; throwable = ce;
@ -61,6 +61,8 @@ public class BlockingThreadPoolExecutor extends ThreadPoolExecutor {
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
logger.log(Level.FINE, ie.getMessage(), ie); logger.log(Level.FINE, ie.getMessage(), ie);
} catch (IOException e) {
logger.log(Level.FINE, e.getMessage(), e);
} }
} }
if (throwable != null) { if (throwable != null) {