From c2fdcf7d9785ae386562f37174d8378b96a58c5d Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Tue, 16 Jan 2024 08:59:49 +0100 Subject: [PATCH 1/2] Ensure QuicStreamChannel.shutdownOutput() is only called once all previous writes were processed. Motivation: We need to ensure QuicStreamChannel.shutdownOutput() is only called once all previous writes were processed. This is necessary as otherwise shutdownOutput() might be called while some writes are still queued (due flowcontrol). Modifications: - Always do the shutdownOutput() via a ChannelFutureListener - Adjust tests Result: Always drain write queue first before shutdown the output. --- .../http3/Http3FrameToHttpObjectCodec.java | 26 +++--- .../Http3FrameToHttpObjectCodecTest.java | 85 ++++++++++++------- 2 files changed, 68 insertions(+), 43 deletions(-) diff --git a/src/main/java/io/netty/incubator/codec/http3/Http3FrameToHttpObjectCodec.java b/src/main/java/io/netty/incubator/codec/http3/Http3FrameToHttpObjectCodec.java index 5d22891..aa2deb3 100644 --- a/src/main/java/io/netty/incubator/codec/http3/Http3FrameToHttpObjectCodec.java +++ b/src/main/java/io/netty/incubator/codec/http3/Http3FrameToHttpObjectCodec.java @@ -50,7 +50,7 @@ * and back. It can be used as an adapter in conjunction with {@link * Http3ServerConnectionHandler} or {@link Http3ClientConnectionHandler} to make http/3 connections * backward-compatible with {@link ChannelHandler}s expecting {@link HttpObject}. - * + *

* For simplicity, it converts to chunked encoding unless the entire stream * is a single header. */ @@ -148,7 +148,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) return; } else { throw new EncoderException( - HttpResponseStatus.CONTINUE.toString() + " must be a FullHttpResponse"); + HttpResponseStatus.CONTINUE + " must be a FullHttpResponse"); } } } @@ -187,18 +187,20 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) Http3Headers headers = HttpConversionUtil.toHttp3Headers(last.trailingHeaders(), validateHeaders); promise = writeWithOptionalCombiner(ctx, new DefaultHttp3HeadersFrame(headers), promise, combiner, true); - } - if (!readable) { + } else if (!readable) { + // Release the data and just use EMPTY_BUFFER. This might allow us to give back memory to the allocator + // faster. last.release(); + if (combiner == null) { + // We only need to write something if there was no write before. + promise = writeWithOptionalCombiner(ctx, + new DefaultHttp3DataFrame(Unpooled.EMPTY_BUFFER), promise, combiner, true); + } } - - if (!readable && !hasTrailers && combiner == null) { - // we had to write nothing. happy days! - ((QuicStreamChannel) ctx.channel()).shutdownOutput(); - promise.trySuccess(); - } else { - promise.addListener(QuicStreamChannel.SHUTDOWN_OUTPUT); - } + // The shutdown is always done via the listener to ensure previous written data is correctly drained + // before QuicStreamChannel.shutdownOutput() is called. Missing to do so might cause previous queued data + // to be failed with a ClosedChannelException. + promise.addListener(QuicStreamChannel.SHUTDOWN_OUTPUT); } else if (msg instanceof HttpContent) { promise = writeWithOptionalCombiner(ctx, new DefaultHttp3DataFrame(((HttpContent) msg).content()), promise, combiner, false); diff --git a/src/test/java/io/netty/incubator/codec/http3/Http3FrameToHttpObjectCodecTest.java b/src/test/java/io/netty/incubator/codec/http3/Http3FrameToHttpObjectCodecTest.java index 8646bfb..d061f20 100644 --- a/src/test/java/io/netty/incubator/codec/http3/Http3FrameToHttpObjectCodecTest.java +++ b/src/test/java/io/netty/incubator/codec/http3/Http3FrameToHttpObjectCodecTest.java @@ -61,6 +61,11 @@ import io.netty.incubator.codec.quic.QuicStreamChannel; import io.netty.util.CharsetUtil; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; import java.nio.CharBuffer; import java.nio.charset.StandardCharsets; @@ -71,6 +76,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.nullValue; @@ -221,6 +227,13 @@ public void testUpgradeEmptyEnd() { ch.writeOutbound(LastHttpContent.EMPTY_LAST_CONTENT); assertTrue(ch.isOutputShutdown()); + Http3DataFrame dataFrame = ch.readOutbound(); + try { + assertThat(dataFrame.content().readableBytes(), is(0)); + } finally { + dataFrame.release(); + } + assertFalse(ch.finish()); } @@ -510,6 +523,13 @@ public void testEncodeEmptyEndAsClient() { ch.writeOutbound(LastHttpContent.EMPTY_LAST_CONTENT); assertTrue(ch.isOutputShutdown()); + Http3DataFrame dataFrame = ch.readOutbound(); + try { + assertThat(dataFrame.content().readableBytes(), is(0)); + } finally { + dataFrame.release(); + } + assertFalse(ch.finish()); } @@ -606,6 +626,13 @@ public void testEncodeEmptyLastPromiseCompletes() { assertThat(headers.path().toString(), is("/hello/world")); assertTrue(ch.isOutputShutdown()); + Http3DataFrame dataFrame = ch.readOutbound(); + try { + assertThat(dataFrame.content().readableBytes(), is(0)); + } finally { + dataFrame.release(); + } + assertFalse(ch.finish()); } @@ -684,31 +711,30 @@ public void testEncodeVoidPromise() { assertFalse(ch.finish()); } - @Test - public void testEncodeCombinations() { - // this test goes through all the branches of Http3FrameToHttpObjectCodec and ensures right functionality - - for (boolean headers : new boolean[]{false, true}) { - for (boolean last : new boolean[]{false, true}) { - for (boolean nonEmptyContent : new boolean[]{false, true}) { - for (boolean hasTrailers : new boolean[]{false, true}) { - for (boolean voidPromise : new boolean[]{false, true}) { - testEncodeCombination(headers, last, nonEmptyContent, hasTrailers, voidPromise); + private static final class EncodeCombinationsArgumentsProvider implements ArgumentsProvider { + @Override + public Stream provideArguments(ExtensionContext extensionContext) { + List arguments = new ArrayList<>(); + for (boolean headers : new boolean[]{false, true}) { + for (boolean last : new boolean[]{false, true}) { + for (boolean nonEmptyContent : new boolean[]{false, true}) { + for (boolean hasTrailers : new boolean[]{false, true}) { + for (boolean voidPromise : new boolean[]{false, true}) { + // this test goes through all the branches of Http3FrameToHttpObjectCodec + // and ensures right functionality + arguments.add(Arguments.of(headers, last, nonEmptyContent, hasTrailers, voidPromise)); + } } } } } + return arguments.stream(); } } - /** - * @param headers Should this be an initial message, with headers ({@link HttpRequest})? - * @param last Should this be a last message ({@link LastHttpContent})? - * @param nonEmptyContent Should this message have non-empty content? - * @param hasTrailers Should this {@code last} message have trailers? - * @param voidPromise Should the write operation use a void promise? - */ - private static void testEncodeCombination( + @ParameterizedTest(name = "headers: {0}, last: {1}, nonEmptyContent: {2}, hasTrailers: {3}, voidPromise: {4}") + @ArgumentsSource(value = EncodeCombinationsArgumentsProvider.class) + public void testEncodeCombination( boolean headers, boolean last, boolean nonEmptyContent, @@ -772,31 +798,28 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) Http3DataFrame dataFrame = ch.readOutbound(); assertThat(dataFrame.content().readableBytes(), is(1)); dataFrame.release(); - } else if (!headers && !hasTrailers && !last) { - ch.readOutbound().release(); } if (hasTrailers) { Http3HeadersFrame trailersFrame = ch.readOutbound(); assertThat(trailersFrame.headers().get("foo"), is("bar")); + } else if (!nonEmptyContent && !headers) { + Http3DataFrame dataFrame = ch.readOutbound(); + assertThat(dataFrame.content().readableBytes(), is(0)); + dataFrame.release(); } - // empty LastHttpContent has no data written and will complete the promise immediately - boolean anyData = hasTrailers || nonEmptyContent || headers || !last; + if (!voidPromise) { - if (anyData) { - assertFalse(fullPromise.isDone()); - } else { - // nothing to write, immediately complete - assertTrue(fullPromise.isDone()); - } - } - if (!last || anyData) { - assertFalse(ch.isOutputShutdown()); + assertFalse(fullPromise.isDone()); } + + assertFalse(ch.isOutputShutdown()); for (ChannelPromise framePromise : framePromises) { framePromise.trySuccess(); } if (last) { assertTrue(ch.isOutputShutdown()); + } else { + assertFalse(ch.isOutputShutdown()); } if (!voidPromise) { assertTrue(fullPromise.isDone()); From edb7763561589ff53a20df848270cb5144b70f95 Mon Sep 17 00:00:00 2001 From: Norman Maurer Date: Tue, 16 Jan 2024 10:20:32 +0100 Subject: [PATCH 2/2] Always correctly release --- .../http3/Http3FrameToHttpObjectCodec.java | 50 ++++++++++--------- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/src/main/java/io/netty/incubator/codec/http3/Http3FrameToHttpObjectCodec.java b/src/main/java/io/netty/incubator/codec/http3/Http3FrameToHttpObjectCodec.java index aa2deb3..c7b5058 100644 --- a/src/main/java/io/netty/incubator/codec/http3/Http3FrameToHttpObjectCodec.java +++ b/src/main/java/io/netty/incubator/codec/http3/Http3FrameToHttpObjectCodec.java @@ -172,35 +172,37 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) if (isLast) { LastHttpContent last = (LastHttpContent) msg; - boolean readable = last.content().isReadable(); - boolean hasTrailers = !last.trailingHeaders().isEmpty(); + try { + boolean readable = last.content().isReadable(); + boolean hasTrailers = !last.trailingHeaders().isEmpty(); - if (combiner == null && readable && hasTrailers && !promise.isVoid()) { - combiner = new PromiseCombiner(ctx.executor()); - } + if (combiner == null && readable && hasTrailers && !promise.isVoid()) { + combiner = new PromiseCombiner(ctx.executor()); + } - if (readable) { - promise = writeWithOptionalCombiner(ctx, - new DefaultHttp3DataFrame(last.content()), promise, combiner, true); - } - if (hasTrailers) { - Http3Headers headers = HttpConversionUtil.toHttp3Headers(last.trailingHeaders(), validateHeaders); - promise = writeWithOptionalCombiner(ctx, - new DefaultHttp3HeadersFrame(headers), promise, combiner, true); - } else if (!readable) { - // Release the data and just use EMPTY_BUFFER. This might allow us to give back memory to the allocator - // faster. - last.release(); - if (combiner == null) { - // We only need to write something if there was no write before. + if (readable) { + promise = writeWithOptionalCombiner( + ctx, new DefaultHttp3DataFrame(last.content().retain()), promise, combiner, true); + } + if (hasTrailers) { + Http3Headers headers = HttpConversionUtil.toHttp3Headers(last.trailingHeaders(), validateHeaders); promise = writeWithOptionalCombiner(ctx, - new DefaultHttp3DataFrame(Unpooled.EMPTY_BUFFER), promise, combiner, true); + new DefaultHttp3HeadersFrame(headers), promise, combiner, true); + } else if (!readable) { + if (combiner == null) { + // We only need to write something if there was no write before. + promise = writeWithOptionalCombiner( + ctx, new DefaultHttp3DataFrame(last.content().retain()), promise, combiner, true); + } } + // The shutdown is always done via the listener to ensure previous written data is correctly drained + // before QuicStreamChannel.shutdownOutput() is called. Missing to do so might cause previous queued + // data to be failed with a ClosedChannelException. + promise = promise.unvoid().addListener(QuicStreamChannel.SHUTDOWN_OUTPUT); + } finally { + // Release LastHttpContent, we retain the content if we need it. + last.release(); } - // The shutdown is always done via the listener to ensure previous written data is correctly drained - // before QuicStreamChannel.shutdownOutput() is called. Missing to do so might cause previous queued data - // to be failed with a ClosedChannelException. - promise.addListener(QuicStreamChannel.SHUTDOWN_OUTPUT); } else if (msg instanceof HttpContent) { promise = writeWithOptionalCombiner(ctx, new DefaultHttp3DataFrame(((HttpContent) msg).content()), promise, combiner, false);