307 lines
15 KiB
Diff
307 lines
15 KiB
Diff
|
From c2fdcf7d9785ae386562f37174d8378b96a58c5d Mon Sep 17 00:00:00 2001
|
||
|
From: Norman Maurer <norman_maurer@apple.com>
|
||
|
Date: Tue, 16 Jan 2024 08:59:49 +0100
|
||
|
Subject: [PATCH 1/2] Ensure QuicStreamChannel.shutdownOutput() is only called
|
||
|
once all previous writes were processed.
|
||
|
|
||
|
Motivation:
|
||
|
We need to ensure QuicStreamChannel.shutdownOutput() is only called once all previous writes were processed. This is necessary as otherwise shutdownOutput() might be called while some writes are still queued (due flowcontrol).
|
||
|
|
||
|
Modifications:
|
||
|
- Always do the shutdownOutput() via a ChannelFutureListener
|
||
|
- Adjust tests
|
||
|
|
||
|
Result:
|
||
|
Always drain write queue first before shutdown the output.
|
||
|
---
|
||
|
.../http3/Http3FrameToHttpObjectCodec.java | 26 +++---
|
||
|
.../Http3FrameToHttpObjectCodecTest.java | 85 ++++++++++++-------
|
||
|
2 files changed, 68 insertions(+), 43 deletions(-)
|
||
|
|
||
|
diff --git a/src/main/java/io/netty/incubator/codec/http3/Http3FrameToHttpObjectCodec.java b/src/main/java/io/netty/incubator/codec/http3/Http3FrameToHttpObjectCodec.java
|
||
|
index 5d22891..aa2deb3 100644
|
||
|
--- a/src/main/java/io/netty/incubator/codec/http3/Http3FrameToHttpObjectCodec.java
|
||
|
+++ b/src/main/java/io/netty/incubator/codec/http3/Http3FrameToHttpObjectCodec.java
|
||
|
@@ -50,7 +50,7 @@
|
||
|
* and back. It can be used as an adapter in conjunction with {@link
|
||
|
* Http3ServerConnectionHandler} or {@link Http3ClientConnectionHandler} to make http/3 connections
|
||
|
* backward-compatible with {@link ChannelHandler}s expecting {@link HttpObject}.
|
||
|
- *
|
||
|
+ * <p>
|
||
|
* For simplicity, it converts to chunked encoding unless the entire stream
|
||
|
* is a single header.
|
||
|
*/
|
||
|
@@ -148,7 +148,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
|
||
|
return;
|
||
|
} else {
|
||
|
throw new EncoderException(
|
||
|
- HttpResponseStatus.CONTINUE.toString() + " must be a FullHttpResponse");
|
||
|
+ HttpResponseStatus.CONTINUE + " must be a FullHttpResponse");
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
@@ -187,18 +187,20 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
|
||
|
Http3Headers headers = HttpConversionUtil.toHttp3Headers(last.trailingHeaders(), validateHeaders);
|
||
|
promise = writeWithOptionalCombiner(ctx,
|
||
|
new DefaultHttp3HeadersFrame(headers), promise, combiner, true);
|
||
|
- }
|
||
|
- if (!readable) {
|
||
|
+ } else if (!readable) {
|
||
|
+ // Release the data and just use EMPTY_BUFFER. This might allow us to give back memory to the allocator
|
||
|
+ // faster.
|
||
|
last.release();
|
||
|
+ if (combiner == null) {
|
||
|
+ // We only need to write something if there was no write before.
|
||
|
+ promise = writeWithOptionalCombiner(ctx,
|
||
|
+ new DefaultHttp3DataFrame(Unpooled.EMPTY_BUFFER), promise, combiner, true);
|
||
|
+ }
|
||
|
}
|
||
|
-
|
||
|
- if (!readable && !hasTrailers && combiner == null) {
|
||
|
- // we had to write nothing. happy days!
|
||
|
- ((QuicStreamChannel) ctx.channel()).shutdownOutput();
|
||
|
- promise.trySuccess();
|
||
|
- } else {
|
||
|
- promise.addListener(QuicStreamChannel.SHUTDOWN_OUTPUT);
|
||
|
- }
|
||
|
+ // The shutdown is always done via the listener to ensure previous written data is correctly drained
|
||
|
+ // before QuicStreamChannel.shutdownOutput() is called. Missing to do so might cause previous queued data
|
||
|
+ // to be failed with a ClosedChannelException.
|
||
|
+ promise.addListener(QuicStreamChannel.SHUTDOWN_OUTPUT);
|
||
|
} else if (msg instanceof HttpContent) {
|
||
|
promise = writeWithOptionalCombiner(ctx,
|
||
|
new DefaultHttp3DataFrame(((HttpContent) msg).content()), promise, combiner, false);
|
||
|
diff --git a/src/test/java/io/netty/incubator/codec/http3/Http3FrameToHttpObjectCodecTest.java b/src/test/java/io/netty/incubator/codec/http3/Http3FrameToHttpObjectCodecTest.java
|
||
|
index 8646bfb..d061f20 100644
|
||
|
--- a/src/test/java/io/netty/incubator/codec/http3/Http3FrameToHttpObjectCodecTest.java
|
||
|
+++ b/src/test/java/io/netty/incubator/codec/http3/Http3FrameToHttpObjectCodecTest.java
|
||
|
@@ -61,6 +61,11 @@
|
||
|
import io.netty.incubator.codec.quic.QuicStreamChannel;
|
||
|
import io.netty.util.CharsetUtil;
|
||
|
import org.junit.jupiter.api.Test;
|
||
|
+import org.junit.jupiter.api.extension.ExtensionContext;
|
||
|
+import org.junit.jupiter.params.ParameterizedTest;
|
||
|
+import org.junit.jupiter.params.provider.Arguments;
|
||
|
+import org.junit.jupiter.params.provider.ArgumentsProvider;
|
||
|
+import org.junit.jupiter.params.provider.ArgumentsSource;
|
||
|
|
||
|
import java.nio.CharBuffer;
|
||
|
import java.nio.charset.StandardCharsets;
|
||
|
@@ -71,6 +76,7 @@
|
||
|
import java.util.concurrent.ExecutionException;
|
||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||
|
import java.util.concurrent.TimeUnit;
|
||
|
+import java.util.stream.Stream;
|
||
|
|
||
|
import static org.hamcrest.CoreMatchers.is;
|
||
|
import static org.hamcrest.CoreMatchers.nullValue;
|
||
|
@@ -221,6 +227,13 @@ public void testUpgradeEmptyEnd() {
|
||
|
ch.writeOutbound(LastHttpContent.EMPTY_LAST_CONTENT);
|
||
|
|
||
|
assertTrue(ch.isOutputShutdown());
|
||
|
+ Http3DataFrame dataFrame = ch.readOutbound();
|
||
|
+ try {
|
||
|
+ assertThat(dataFrame.content().readableBytes(), is(0));
|
||
|
+ } finally {
|
||
|
+ dataFrame.release();
|
||
|
+ }
|
||
|
+
|
||
|
assertFalse(ch.finish());
|
||
|
}
|
||
|
|
||
|
@@ -510,6 +523,13 @@ public void testEncodeEmptyEndAsClient() {
|
||
|
ch.writeOutbound(LastHttpContent.EMPTY_LAST_CONTENT);
|
||
|
|
||
|
assertTrue(ch.isOutputShutdown());
|
||
|
+ Http3DataFrame dataFrame = ch.readOutbound();
|
||
|
+ try {
|
||
|
+ assertThat(dataFrame.content().readableBytes(), is(0));
|
||
|
+ } finally {
|
||
|
+ dataFrame.release();
|
||
|
+ }
|
||
|
+
|
||
|
assertFalse(ch.finish());
|
||
|
}
|
||
|
|
||
|
@@ -606,6 +626,13 @@ public void testEncodeEmptyLastPromiseCompletes() {
|
||
|
assertThat(headers.path().toString(), is("/hello/world"));
|
||
|
assertTrue(ch.isOutputShutdown());
|
||
|
|
||
|
+ Http3DataFrame dataFrame = ch.readOutbound();
|
||
|
+ try {
|
||
|
+ assertThat(dataFrame.content().readableBytes(), is(0));
|
||
|
+ } finally {
|
||
|
+ dataFrame.release();
|
||
|
+ }
|
||
|
+
|
||
|
assertFalse(ch.finish());
|
||
|
}
|
||
|
|
||
|
@@ -684,31 +711,30 @@ public void testEncodeVoidPromise() {
|
||
|
assertFalse(ch.finish());
|
||
|
}
|
||
|
|
||
|
- @Test
|
||
|
- public void testEncodeCombinations() {
|
||
|
- // this test goes through all the branches of Http3FrameToHttpObjectCodec and ensures right functionality
|
||
|
-
|
||
|
- for (boolean headers : new boolean[]{false, true}) {
|
||
|
- for (boolean last : new boolean[]{false, true}) {
|
||
|
- for (boolean nonEmptyContent : new boolean[]{false, true}) {
|
||
|
- for (boolean hasTrailers : new boolean[]{false, true}) {
|
||
|
- for (boolean voidPromise : new boolean[]{false, true}) {
|
||
|
- testEncodeCombination(headers, last, nonEmptyContent, hasTrailers, voidPromise);
|
||
|
+ private static final class EncodeCombinationsArgumentsProvider implements ArgumentsProvider {
|
||
|
+ @Override
|
||
|
+ public Stream<? extends Arguments> provideArguments(ExtensionContext extensionContext) {
|
||
|
+ List<Arguments> arguments = new ArrayList<>();
|
||
|
+ for (boolean headers : new boolean[]{false, true}) {
|
||
|
+ for (boolean last : new boolean[]{false, true}) {
|
||
|
+ for (boolean nonEmptyContent : new boolean[]{false, true}) {
|
||
|
+ for (boolean hasTrailers : new boolean[]{false, true}) {
|
||
|
+ for (boolean voidPromise : new boolean[]{false, true}) {
|
||
|
+ // this test goes through all the branches of Http3FrameToHttpObjectCodec
|
||
|
+ // and ensures right functionality
|
||
|
+ arguments.add(Arguments.of(headers, last, nonEmptyContent, hasTrailers, voidPromise));
|
||
|
+ }
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
+ return arguments.stream();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
- /**
|
||
|
- * @param headers Should this be an initial message, with headers ({@link HttpRequest})?
|
||
|
- * @param last Should this be a last message ({@link LastHttpContent})?
|
||
|
- * @param nonEmptyContent Should this message have non-empty content?
|
||
|
- * @param hasTrailers Should this {@code last} message have trailers?
|
||
|
- * @param voidPromise Should the write operation use a void promise?
|
||
|
- */
|
||
|
- private static void testEncodeCombination(
|
||
|
+ @ParameterizedTest(name = "headers: {0}, last: {1}, nonEmptyContent: {2}, hasTrailers: {3}, voidPromise: {4}")
|
||
|
+ @ArgumentsSource(value = EncodeCombinationsArgumentsProvider.class)
|
||
|
+ public void testEncodeCombination(
|
||
|
boolean headers,
|
||
|
boolean last,
|
||
|
boolean nonEmptyContent,
|
||
|
@@ -772,31 +798,28 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
|
||
|
Http3DataFrame dataFrame = ch.readOutbound();
|
||
|
assertThat(dataFrame.content().readableBytes(), is(1));
|
||
|
dataFrame.release();
|
||
|
- } else if (!headers && !hasTrailers && !last) {
|
||
|
- ch.<Http3DataFrame>readOutbound().release();
|
||
|
}
|
||
|
if (hasTrailers) {
|
||
|
Http3HeadersFrame trailersFrame = ch.readOutbound();
|
||
|
assertThat(trailersFrame.headers().get("foo"), is("bar"));
|
||
|
+ } else if (!nonEmptyContent && !headers) {
|
||
|
+ Http3DataFrame dataFrame = ch.readOutbound();
|
||
|
+ assertThat(dataFrame.content().readableBytes(), is(0));
|
||
|
+ dataFrame.release();
|
||
|
}
|
||
|
- // empty LastHttpContent has no data written and will complete the promise immediately
|
||
|
- boolean anyData = hasTrailers || nonEmptyContent || headers || !last;
|
||
|
+
|
||
|
if (!voidPromise) {
|
||
|
- if (anyData) {
|
||
|
- assertFalse(fullPromise.isDone());
|
||
|
- } else {
|
||
|
- // nothing to write, immediately complete
|
||
|
- assertTrue(fullPromise.isDone());
|
||
|
- }
|
||
|
- }
|
||
|
- if (!last || anyData) {
|
||
|
- assertFalse(ch.isOutputShutdown());
|
||
|
+ assertFalse(fullPromise.isDone());
|
||
|
}
|
||
|
+
|
||
|
+ assertFalse(ch.isOutputShutdown());
|
||
|
for (ChannelPromise framePromise : framePromises) {
|
||
|
framePromise.trySuccess();
|
||
|
}
|
||
|
if (last) {
|
||
|
assertTrue(ch.isOutputShutdown());
|
||
|
+ } else {
|
||
|
+ assertFalse(ch.isOutputShutdown());
|
||
|
}
|
||
|
if (!voidPromise) {
|
||
|
assertTrue(fullPromise.isDone());
|
||
|
|
||
|
From edb7763561589ff53a20df848270cb5144b70f95 Mon Sep 17 00:00:00 2001
|
||
|
From: Norman Maurer <norman_maurer@apple.com>
|
||
|
Date: Tue, 16 Jan 2024 10:20:32 +0100
|
||
|
Subject: [PATCH 2/2] Always correctly release
|
||
|
|
||
|
---
|
||
|
.../http3/Http3FrameToHttpObjectCodec.java | 50 ++++++++++---------
|
||
|
1 file changed, 26 insertions(+), 24 deletions(-)
|
||
|
|
||
|
diff --git a/src/main/java/io/netty/incubator/codec/http3/Http3FrameToHttpObjectCodec.java b/src/main/java/io/netty/incubator/codec/http3/Http3FrameToHttpObjectCodec.java
|
||
|
index aa2deb3..c7b5058 100644
|
||
|
--- a/src/main/java/io/netty/incubator/codec/http3/Http3FrameToHttpObjectCodec.java
|
||
|
+++ b/src/main/java/io/netty/incubator/codec/http3/Http3FrameToHttpObjectCodec.java
|
||
|
@@ -172,35 +172,37 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
|
||
|
|
||
|
if (isLast) {
|
||
|
LastHttpContent last = (LastHttpContent) msg;
|
||
|
- boolean readable = last.content().isReadable();
|
||
|
- boolean hasTrailers = !last.trailingHeaders().isEmpty();
|
||
|
+ try {
|
||
|
+ boolean readable = last.content().isReadable();
|
||
|
+ boolean hasTrailers = !last.trailingHeaders().isEmpty();
|
||
|
|
||
|
- if (combiner == null && readable && hasTrailers && !promise.isVoid()) {
|
||
|
- combiner = new PromiseCombiner(ctx.executor());
|
||
|
- }
|
||
|
+ if (combiner == null && readable && hasTrailers && !promise.isVoid()) {
|
||
|
+ combiner = new PromiseCombiner(ctx.executor());
|
||
|
+ }
|
||
|
|
||
|
- if (readable) {
|
||
|
- promise = writeWithOptionalCombiner(ctx,
|
||
|
- new DefaultHttp3DataFrame(last.content()), promise, combiner, true);
|
||
|
- }
|
||
|
- if (hasTrailers) {
|
||
|
- Http3Headers headers = HttpConversionUtil.toHttp3Headers(last.trailingHeaders(), validateHeaders);
|
||
|
- promise = writeWithOptionalCombiner(ctx,
|
||
|
- new DefaultHttp3HeadersFrame(headers), promise, combiner, true);
|
||
|
- } else if (!readable) {
|
||
|
- // Release the data and just use EMPTY_BUFFER. This might allow us to give back memory to the allocator
|
||
|
- // faster.
|
||
|
- last.release();
|
||
|
- if (combiner == null) {
|
||
|
- // We only need to write something if there was no write before.
|
||
|
+ if (readable) {
|
||
|
+ promise = writeWithOptionalCombiner(
|
||
|
+ ctx, new DefaultHttp3DataFrame(last.content().retain()), promise, combiner, true);
|
||
|
+ }
|
||
|
+ if (hasTrailers) {
|
||
|
+ Http3Headers headers = HttpConversionUtil.toHttp3Headers(last.trailingHeaders(), validateHeaders);
|
||
|
promise = writeWithOptionalCombiner(ctx,
|
||
|
- new DefaultHttp3DataFrame(Unpooled.EMPTY_BUFFER), promise, combiner, true);
|
||
|
+ new DefaultHttp3HeadersFrame(headers), promise, combiner, true);
|
||
|
+ } else if (!readable) {
|
||
|
+ if (combiner == null) {
|
||
|
+ // We only need to write something if there was no write before.
|
||
|
+ promise = writeWithOptionalCombiner(
|
||
|
+ ctx, new DefaultHttp3DataFrame(last.content().retain()), promise, combiner, true);
|
||
|
+ }
|
||
|
}
|
||
|
+ // The shutdown is always done via the listener to ensure previous written data is correctly drained
|
||
|
+ // before QuicStreamChannel.shutdownOutput() is called. Missing to do so might cause previous queued
|
||
|
+ // data to be failed with a ClosedChannelException.
|
||
|
+ promise = promise.unvoid().addListener(QuicStreamChannel.SHUTDOWN_OUTPUT);
|
||
|
+ } finally {
|
||
|
+ // Release LastHttpContent, we retain the content if we need it.
|
||
|
+ last.release();
|
||
|
}
|
||
|
- // The shutdown is always done via the listener to ensure previous written data is correctly drained
|
||
|
- // before QuicStreamChannel.shutdownOutput() is called. Missing to do so might cause previous queued data
|
||
|
- // to be failed with a ClosedChannelException.
|
||
|
- promise.addListener(QuicStreamChannel.SHUTDOWN_OUTPUT);
|
||
|
} else if (msg instanceof HttpContent) {
|
||
|
promise = writeWithOptionalCombiner(ctx,
|
||
|
new DefaultHttp3DataFrame(((HttpContent) msg).content()), promise, combiner, false);
|