better close/flush

This commit is contained in:
Jörg Prante 2023-03-30 17:44:14 +02:00
parent fc1beabb58
commit dfd3abd4a9
14 changed files with 113 additions and 152 deletions

View file

@ -34,18 +34,15 @@ public class Https1Handler extends ChannelDuplexHandler {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpPipelinedRequest) {
HttpPipelinedRequest httpPipelinedRequest = (HttpPipelinedRequest) msg;
if (msg instanceof HttpPipelinedRequest httpPipelinedRequest) {
try {
if (httpPipelinedRequest.getRequest() instanceof FullHttpRequest) {
FullHttpRequest fullHttpRequest = (FullHttpRequest) httpPipelinedRequest.getRequest();
if (httpPipelinedRequest.getRequest() instanceof FullHttpRequest fullHttpRequest) {
requestReceived(ctx, fullHttpRequest, httpPipelinedRequest.getSequenceId());
}
} finally {
httpPipelinedRequest.release();
}
} else if (msg instanceof FullHttpRequest) {
FullHttpRequest fullHttpRequest = (FullHttpRequest) msg;
} else if (msg instanceof FullHttpRequest fullHttpRequest) {
try {
requestReceived(ctx, fullHttpRequest, 0);
} finally {

View file

@ -36,8 +36,7 @@ public class Https2Handler extends ChannelDuplexHandler {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) {
FullHttpRequest fullHttpRequest = (FullHttpRequest) msg;
if (msg instanceof FullHttpRequest fullHttpRequest) {
HttpAddress httpAddress = ctx.channel().attr(NettyHttpsServerConfig.ATTRIBUTE_KEY_HTTP_ADDRESS).get();
try {
Integer streamId = fullHttpRequest.headers().getInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text());

View file

@ -13,13 +13,9 @@ import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.util.logging.Level;
import java.util.logging.Logger;
public class HttpRequestBuilder extends BaseHttpRequestBuilder {
private static final Logger logger = Logger.getLogger(HttpRequestBuilder.class.getName());
protected FullHttpRequest fullHttpRequest;
protected ByteBuffer byteBuffer;
@ -117,7 +113,6 @@ public class HttpRequestBuilder extends BaseHttpRequestBuilder {
@Override
public void release() {
if (fullHttpRequest != null) {
logger.log(Level.FINER, "releasing retained netty request");
fullHttpRequest.release();
}
}

View file

@ -6,24 +6,16 @@ import java.io.IOException;
public class HttpResponse extends BaseHttpResponse {
private final HttpResponseBuilder builder;
protected HttpResponse(HttpResponseBuilder builder) {
super(builder);
this.builder = builder;
}
public static HttpResponseBuilder builder() {
return new HttpResponseBuilder();
}
@Override
public void close() throws IOException {
builder.release();
}
@Override
public void flush() throws IOException {
builder.flush();
// ignore
}
}

View file

@ -90,8 +90,8 @@ public class HttpResponseBuilder extends BaseHttpResponseBuilder {
}
@Override
public HttpResponseBuilder shouldFlush(boolean shouldFlush) {
super.shouldFlush(shouldFlush);
public HttpResponseBuilder withConnectionCloseHeader(boolean close) {
super.withConnectionCloseHeader(close);
return this;
}
@ -124,6 +124,24 @@ public class HttpResponseBuilder extends BaseHttpResponseBuilder {
return this;
}
@Override
public HttpResponse build() {
Objects.requireNonNull(ctx);
if (body != null) {
internalStringWrite(body);
} else if (charBuffer != null && charset != null) {
internalBufferWrite(charBuffer, charset);
} else if (dataBuffer != null) {
internalBufferWrite(dataBuffer);
} else if (fileChannel != null) {
internalFileWrite(fileChannel, bufferSize, true);
} else if (inputStream != null) {
internalStreamWrite(inputStream, bufferSize, true);
}
return new HttpResponse(this);
}
public void flush() {
internalBufferWrite(Unpooled.buffer(0));
}
@ -131,47 +149,30 @@ public class HttpResponseBuilder extends BaseHttpResponseBuilder {
@Override
public void release() {
super.release();
if (ctx != null && ctx.channel().isOpen()) {
logger.log(Level.FINER, "closing netty channel " + ctx.channel());
ctx.close();
}
}
@Override
public HttpResponse build() {
Objects.requireNonNull(ctx);
if (body != null) {
internalWrite(body);
} else if (charBuffer != null && charset != null) {
internalWrite(charBuffer, charset);
} else if (dataBuffer != null) {
internalWrite(dataBuffer);
} else if (fileChannel != null) {
internalWrite(fileChannel, bufferSize, true);
} else if (inputStream != null) {
internalWrite(inputStream, bufferSize, true);
}
return new HttpResponse(this);
private void internalStringWrite(String body) {
internalBufferWrite(dataBufferFactory.wrap(StandardCharsets.UTF_8.encode(body)));
}
private void internalWrite(String body) {
internalWrite(dataBufferFactory.wrap(StandardCharsets.UTF_8.encode(body)));
}
private void internalWrite(DataBuffer dataBuffer) {
private void internalBufferWrite(DataBuffer dataBuffer) {
NettyDataBuffer nettyDataBuffer = (NettyDataBuffer) dataBuffer;
internalBufferWrite(nettyDataBuffer.getNativeBuffer());
}
private void internalWrite(CharBuffer charBuffer, Charset charset) {
private void internalBufferWrite(CharBuffer charBuffer, Charset charset) {
internalBufferWrite(ByteBufUtil.encodeString(ctx.alloc(), charBuffer, charset));
}
private void internalBufferWrite(ByteBuf byteBuf) {
internalBufferWrite(byteBuf, byteBuf.readableBytes());
internalBufferWrite(byteBuf, byteBuf.readableBytes(), true);
}
private void internalBufferWrite(ByteBuf byteBuf, int length) {
private void internalBufferWrite(ByteBuf byteBuf, int length, boolean keepAlive) {
if (!ctx.channel().isWritable()) {
logger.log(Level.WARNING, "the channel " + ctx.channel() + " is not writable");
return;
}
super.buildHeaders(length);
HttpResponseStatus responseStatus = HttpResponseStatus.valueOf(status.code());
HttpHeaders headers = new DefaultHttpHeaders();
@ -186,25 +187,29 @@ public class HttpResponseBuilder extends BaseHttpResponseBuilder {
}
HttpHeaders trailingHeaders = new DefaultHttpHeaders();
super.trailingHeaders.entries().forEach(e -> trailingHeaders.add(e.getKey(), e.getValue()));
FullHttpResponse fullHttpResponse = new DefaultFullHttpResponse(HttpVersion.valueOf(version.text()),
responseStatus, byteBuf.retain(), headers, trailingHeaders);
if (!ctx.channel().isWritable()) {
logger.log(Level.WARNING, "we have a problem, the channel " + ctx.channel() + " is not writable");
return;
}
if (sequenceId != null) {
HttpPipelinedResponse httpPipelinedResponse = new HttpPipelinedResponse(fullHttpResponse,
ctx.channel().newPromise(), sequenceId);
ctx.write(httpPipelinedResponse);
} else {
ctx.write(fullHttpResponse);
}
ctx.flush();
ctx.channel().eventLoop().execute(() -> {
FullHttpResponse fullHttpResponse = new DefaultFullHttpResponse(HttpVersion.valueOf(version.text()),
responseStatus, byteBuf.retain(), headers, trailingHeaders);
ChannelFuture channelFuture;
if (sequenceId != null) {
HttpPipelinedResponse httpPipelinedResponse = new HttpPipelinedResponse(fullHttpResponse,
ctx.channel().newPromise(), sequenceId);
channelFuture = ctx.write(httpPipelinedResponse);
} else {
channelFuture = ctx.write(fullHttpResponse);
}
if (!keepAlive || shouldClose()) {
logger.log(Level.FINER, "adding close listener to channel future " + channelFuture);
channelFuture.addListener(CLOSE);
}
ctx.flush();
});
}
private void internalWrite(FileChannel fileChannel, int bufferSize, boolean keepAlive) {
private void internalFileWrite(FileChannel fileChannel, int bufferSize, boolean keepAlive) {
if (!ctx.channel().isWritable()) {
logger.log(Level.WARNING, "channel not writeable: " + ctx.channel());
logger.log(Level.WARNING, "the channel is not writeable: " + ctx.channel());
return;
}
HttpResponseStatus responseStatus = HttpResponseStatus.valueOf(status.code());
DefaultHttpResponse rsp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, responseStatus);
@ -216,21 +221,17 @@ public class HttpResponseBuilder extends BaseHttpResponseBuilder {
throw new UncheckedIOException(e);
}
ChannelFuture channelFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
if (headers.containsHeader(HttpHeaderNames.CONTENT_LENGTH)) {
if (!keepAlive) {
logger.log(Level.FINER, "adding close listener to channel future " + channelFuture);
channelFuture.addListener(CLOSE);
}
} else {
if (!keepAlive || shouldClose()) {
logger.log(Level.FINER, "adding close listener to channel future " + channelFuture);
channelFuture.addListener(CLOSE);
}
ctx.flush();
});
}
private void internalWrite(InputStream inputStream, int bufferSize, boolean keepAlive) {
private void internalStreamWrite(InputStream inputStream, int bufferSize, boolean keepAlive) {
if (!ctx.channel().isWritable()) {
logger.log(Level.WARNING, "channel not writeable: " + ctx.channel());
logger.log(Level.WARNING, "the channel is not writeable: " + ctx.channel());
return;
}
ByteBuf buffer;
@ -247,7 +248,7 @@ public class HttpResponseBuilder extends BaseHttpResponseBuilder {
return;
}
if (count < bufferSize) {
internalBufferWrite(buffer, count);
internalBufferWrite(buffer, count, keepAlive);
} else {
// chunked
super.buildHeaders(0);
@ -259,27 +260,25 @@ public class HttpResponseBuilder extends BaseHttpResponseBuilder {
}
HttpHeaders trailingHeaders = new DefaultHttpHeaders();
super.trailingHeaders.entries().forEach(e -> trailingHeaders.add(e.getKey(), e.getValue()));
DefaultHttpResponse defaultHttpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, responseStatus);
if (!headers.contains(HttpHeaderNames.CONTENT_LENGTH)) {
headers.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
} else {
if (keepAlive) {
headers.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
ctx.channel().eventLoop().execute(() -> {
DefaultHttpResponse defaultHttpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, responseStatus);
if (!headers.contains(HttpHeaderNames.CONTENT_LENGTH)) {
headers.set(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
} else {
if (keepAlive) {
headers.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
}
}
}
defaultHttpResponse.headers().set(headers);
ctx.write(defaultHttpResponse);
ctx.write(new ChunkedStream(inputStream, bufferSize));
ChannelFuture channelFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
if (headers.contains(HttpHeaderNames.CONTENT_LENGTH)) {
if (!keepAlive) {
defaultHttpResponse.headers().set(headers);
ctx.write(defaultHttpResponse);
ctx.write(new ChunkedStream(inputStream, bufferSize));
ChannelFuture channelFuture = ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
if (!keepAlive || shouldClose) {
logger.log(Level.FINER, "adding close listener to channel future " + channelFuture);
channelFuture.addListener(CLOSE);
}
} else {
logger.log(Level.FINER, "adding close listener to channel future " + channelFuture);
channelFuture.addListener(CLOSE);
}
ctx.flush();
});
}
}
}

View file

@ -36,8 +36,7 @@ public class Http2Handler extends ChannelDuplexHandler {
@Override
public void channelRead(ChannelHandlerContext ctx, Object object) throws IOException {
if (object instanceof FullHttpRequest) {
FullHttpRequest fullHttpRequest = (FullHttpRequest) object;
if (object instanceof FullHttpRequest fullHttpRequest) {
HttpAddress httpAddress = ctx.channel().attr(NettyHttpServerConfig.ATTRIBUTE_KEY_HTTP_ADDRESS).get();
try {
Integer streamId = fullHttpRequest.headers().getInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text());

View file

@ -22,11 +22,6 @@ public class HttpResponse extends BaseHttpResponse {
this.builder = builder;
}
@Override
public void close() throws IOException {
builder.internalClose();
}
@Override
public void flush() throws IOException {
builder.internalFlush();

View file

@ -38,9 +38,6 @@ public class HttpResponseBuilder extends BaseHttpResponseBuilder {
public HttpResponse build() {
Objects.requireNonNull(outputStream);
try {
if (shouldFlush()) {
internalFlush();
}
if (body != null) {
internalWrite(body);
} else if (charBuffer != null && charset != null) {
@ -52,14 +49,20 @@ public class HttpResponseBuilder extends BaseHttpResponseBuilder {
} else if (inputStream != null) {
internalWrite(inputStream, bufferSize);
}
if (shouldClose()) {
internalClose();
}
} catch (IOException e) {
logger.log(Level.SEVERE, e.getMessage(), e);
} return new HttpResponse(this);
}
@Override
public void release() {
try {
internalClose();
} catch (IOException e) {
logger.log(Level.WARNING, e.getMessage(), e);
}
}
void internalFlush() throws IOException {
outputStream.flush();
}
@ -114,8 +117,4 @@ public class HttpResponseBuilder extends BaseHttpResponseBuilder {
channel.write(fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, contentLength));
}
}
@Override
public void release() {
}
}

View file

@ -13,11 +13,6 @@ public class HttpResponse extends BaseHttpResponse {
this.builder = builder;
}
@Override
public void close() throws IOException {
builder.internalClose();
}
@Override
public void flush() throws IOException {
builder.internalFlush();

View file

@ -38,9 +38,6 @@ public class HttpResponseBuilder extends BaseHttpResponseBuilder {
public HttpResponse build() {
Objects.requireNonNull(outputStream);
try {
if (shouldFlush()) {
internalFlush();
}
if (body != null) {
internalWrite(body);
} else if (charBuffer != null && charset != null) {
@ -52,16 +49,22 @@ public class HttpResponseBuilder extends BaseHttpResponseBuilder {
} else if (inputStream != null) {
internalWrite(inputStream, bufferSize);
}
if (shouldClose()) {
internalClose();
}
} catch (IOException e) {
logger.log(Level.SEVERE, e.getMessage(), e);
}
return new HttpResponse(this);
}
void internalFlush() throws IOException {
@Override
public void release() {
try {
internalClose();
} catch (IOException e) {
logger.log(Level.WARNING, e.getMessage(), e);
}
}
void internalFlush() {
write(dataBufferFactory.allocateBuffer());
}
@ -118,8 +121,4 @@ public class HttpResponseBuilder extends BaseHttpResponseBuilder {
channel.write(fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, contentLength));
}
}
@Override
public void release() {
}
}

View file

@ -57,9 +57,9 @@ public abstract class BaseHttpResponseBuilder implements HttpResponseBuilder {
*/
protected HttpServerConfig httpServerConfig;
protected boolean shouldClose;
protected boolean withConnectionCloseHeader;
protected boolean shouldFlush;
protected boolean shouldClose;
protected Integer sequenceId;
@ -103,7 +103,7 @@ public abstract class BaseHttpResponseBuilder implements HttpResponseBuilder {
this.trailingHeaders = new HttpHeaders();
this.contentType = HttpHeaderValues.APPLICATION_OCTET_STREAM;
this.dataBufferFactory = DefaultDataBufferFactory.getInstance();
this.shouldClose = false; // tell client we want to keep the connection alive
this.withConnectionCloseHeader = false; // tell client we want to keep the connection alive
this.attributes = new BaseAttributes();
}
@ -204,23 +204,22 @@ public abstract class BaseHttpResponseBuilder implements HttpResponseBuilder {
return this;
}
@Override
public BaseHttpResponseBuilder shouldFlush(boolean shouldFlush) {
public BaseHttpResponseBuilder withConnectionCloseHeader(boolean withConnectionCloseHeader) {
if (done) {
return this;
}
this.shouldFlush = shouldFlush;
this.withConnectionCloseHeader = withConnectionCloseHeader;
return this;
}
@Override
public boolean shouldFlush() {
return shouldFlush;
public boolean withConnectionCloseHeader() {
return withConnectionCloseHeader;
}
@Override
public BaseHttpResponseBuilder shouldClose(boolean shouldClose) {
public HttpResponseBuilder shouldClose(boolean shouldClose) {
if (done) {
return this;
}
@ -265,7 +264,7 @@ public abstract class BaseHttpResponseBuilder implements HttpResponseBuilder {
if (body != null && this.body == null) {
this.body = body;
} else {
logger.log(Level.WARNING, "cannot write null string");
logger.log(Level.WARNING, "cannot write more than one body");
}
return this;
}
@ -276,7 +275,7 @@ public abstract class BaseHttpResponseBuilder implements HttpResponseBuilder {
this.charBuffer = charBuffer;
this.charset = charset;
} else {
logger.log(Level.WARNING, "cannot write CharBuffer");
logger.log(Level.WARNING, "cannot write more than one CharBuffer");
}
return this;
}
@ -286,7 +285,7 @@ public abstract class BaseHttpResponseBuilder implements HttpResponseBuilder {
if (dataBuffer != null && this.dataBuffer == null) {
this.dataBuffer = dataBuffer;
} else {
logger.log(Level.WARNING, "cannot write DataBuffer");
logger.log(Level.WARNING, "cannot write more than one DataBuffer");
}
return this;
}
@ -297,7 +296,7 @@ public abstract class BaseHttpResponseBuilder implements HttpResponseBuilder {
this.inputStream = inputStream;
this.bufferSize = bufferSize;
} else {
logger.log(Level.WARNING, "cannot write InputStream");
logger.log(Level.WARNING, "cannot write more than one InputStream");
}
return this;
}
@ -308,7 +307,7 @@ public abstract class BaseHttpResponseBuilder implements HttpResponseBuilder {
this.fileChannel = fileChannel;
this.bufferSize = bufferSize;
} else {
logger.log(Level.WARNING, "cannot write FileChannel");
logger.log(Level.WARNING, "cannot write more than one FileChannel");
}
return this;
}
@ -347,7 +346,6 @@ public abstract class BaseHttpResponseBuilder implements HttpResponseBuilder {
@Override
public void release() {
if (dataBuffer != null) {
logger.log(Level.FINER, "databuffer release " + dataBuffer);
dataBuffer.release();
}
}
@ -372,7 +370,7 @@ public abstract class BaseHttpResponseBuilder implements HttpResponseBuilder {
headers.add(HttpHeaderNames.CONTENT_LENGTH, Long.toString(contentLength));
}
}
if (shouldClose) {
if (withConnectionCloseHeader) {
headers.add(HttpHeaderNames.CONNECTION, "close");
}
if (!headers.containsHeader(HttpHeaderNames.DATE)) {

View file

@ -1,9 +1,7 @@
package org.xbib.net.http.server;
import java.io.IOException;
import org.xbib.net.Response;
public interface HttpResponse extends Response {
void close() throws IOException;
}

View file

@ -38,9 +38,9 @@ public interface HttpResponseBuilder {
HttpResponseBuilder addCookie(Cookie cookie);
HttpResponseBuilder shouldFlush(boolean sholdFlush);
HttpResponseBuilder withConnectionCloseHeader(boolean withConnectionCloseHeader);
boolean shouldFlush();
boolean withConnectionCloseHeader();
HttpResponseBuilder shouldClose(boolean shouldClose);

View file

@ -3,7 +3,6 @@ package org.xbib.net.http.server.render;
import java.io.IOException;
import org.xbib.net.http.server.HttpHandler;
import org.xbib.net.http.server.HttpResponse;
import org.xbib.net.http.server.HttpResponseBuilder;
import org.xbib.net.http.server.HttpServerContext;
public class HttpResponseRenderer implements HttpHandler {
@ -13,11 +12,8 @@ public class HttpResponseRenderer implements HttpHandler {
@Override
public void handle(HttpServerContext context) throws IOException {
HttpResponseBuilder httpResponseBuilder = context.response();
// here we do the heavy lifting of rendering all elements for the response
HttpResponse httpResponse = httpResponseBuilder.build();
if (httpResponseBuilder.shouldClose()) {
httpResponse.close();
}
HttpResponse httpResponse = context.response().build();
httpResponse.flush();
}
}