From 1352fa5543638d421042f1d69c8fc2c6f4603968 Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Thu, 11 Jan 2024 11:24:07 -0800 Subject: [PATCH] h2: propagate stream close without read pending, avoid SOOE if autoRead == false Motivation: AbstractHttp2StreamChannel requires that there is a channel.read() pending in order to notify of channel inactive events. For use cases that don't use auto read the application could have read the frame corresponding to end of stream but not get the channel inactive. If read is called from within channelReadComplete this creates a reentrant loop into AbstractHttp2StreamChannel#doBeginRead that may cause a StackOverflowException. Modifications: - propagate stream/channel closed event if possible (e.g. no pending data) without invoking read(). - adjust readPending state to prevent StackOverflowException and let the read loop continue. --- .../http2/AbstractHttp2StreamChannel.java | 84 +++++++++++-------- .../codec/http2/Http2MultiplexTest.java | 73 +++++++++++++++- 2 files changed, 121 insertions(+), 36 deletions(-) diff --git a/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamChannel.java b/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamChannel.java index 49882acf1ad7..c768576d2aea 100644 --- a/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamChannel.java +++ b/codec-http2/src/main/java/io/netty/handler/codec/http2/AbstractHttp2StreamChannel.java @@ -605,7 +605,7 @@ void fireChildRead(Http2Frame frame) { if (allocHandle.continueReading()) { maybeAddChannelToReadCompletePendingQueue(); } else { - unsafe.notifyReadComplete(allocHandle, true); + unsafe.notifyReadComplete(allocHandle, true, false); } } else { if (inboundBuffer == null) { @@ -618,7 +618,7 @@ void fireChildRead(Http2Frame frame) { void fireChildReadComplete() { assert eventLoop().inEventLoop(); assert readStatus != ReadStatus.IDLE || !readCompletePending; - unsafe.notifyReadComplete(unsafe.recvBufAllocHandle(), false); + unsafe.notifyReadComplete(unsafe.recvBufAllocHandle(), false, false); } final void closeWithError(Http2Error error) { @@ -851,35 +851,47 @@ private Object pollQueuedMessage() { } void doBeginRead() { - // Process messages until there are none left (or the user stopped requesting) and also handle EOS. - while (readStatus != ReadStatus.IDLE) { - Object message = pollQueuedMessage(); - if (message == null) { - if (readEOS) { - unsafe.closeForcibly(); - } - // We need to double check that there is nothing left to flush such as a - // window update frame. + if (readStatus == ReadStatus.IDLE) { + // Don't wait for the user to request a read to notify of channel closure. + if (readEOS && (inboundBuffer == null || inboundBuffer.isEmpty())) { + // Double check there is nothing left to flush such as a window update frame. flush(); - break; - } - final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); - allocHandle.reset(config()); - boolean continueReading = false; - do { - doRead0((Http2Frame) message, allocHandle); - } while ((readEOS || (continueReading = allocHandle.continueReading())) - && (message = pollQueuedMessage()) != null); - - if (continueReading && isParentReadInProgress() && !readEOS) { - // Currently the parent and child channel are on the same EventLoop thread. If the parent is - // currently reading it is possible that more frames will be delivered to this child channel. In - // the case that this child channel still wants to read we delay the channelReadComplete on this - // child channel until the parent is done reading. - maybeAddChannelToReadCompletePendingQueue(); - } else { - notifyReadComplete(allocHandle, true); + unsafe.closeForcibly(); } + } else { + do { // Process messages until there are none left (or the user stopped requesting) and also handle EOS. + Object message = pollQueuedMessage(); + if (message == null) { + // Double check there is nothing left to flush such as a window update frame. + flush(); + if (readEOS) { + unsafe.closeForcibly(); + } + break; + } + final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); + allocHandle.reset(config()); + boolean continueReading = false; + do { + doRead0((Http2Frame) message, allocHandle); + } while ((readEOS || (continueReading = allocHandle.continueReading())) + && (message = pollQueuedMessage()) != null); + + if (continueReading && isParentReadInProgress() && !readEOS) { + // Currently the parent and child channel are on the same EventLoop thread. If the parent is + // currently reading it is possible that more frames will be delivered to this child channel. In + // the case that this child channel still wants to read we delay the channelReadComplete on this + // child channel until the parent is done reading. + maybeAddChannelToReadCompletePendingQueue(); + } else { + notifyReadComplete(allocHandle, true, true); + + // While in the read loop reset the readState AFTER calling readComplete (or other pipeline + // callbacks) to prevents re-entry into this method (if autoRead is disabled and the user calls + // read on each readComplete) and StackOverflowException. + resetReadStatus(); + } + } while (readStatus != ReadStatus.IDLE); } } @@ -908,17 +920,21 @@ private void updateLocalWindowIfNeeded() { } } - void notifyReadComplete(RecvByteBufAllocator.Handle allocHandle, boolean forceReadComplete) { + private void resetReadStatus() { + readStatus = readStatus == ReadStatus.REQUESTED ? ReadStatus.IN_PROGRESS : ReadStatus.IDLE; + } + + void notifyReadComplete(RecvByteBufAllocator.Handle allocHandle, boolean forceReadComplete, + boolean inReadLoop) { if (!readCompletePending && !forceReadComplete) { return; } // Set to false just in case we added the channel multiple times before. readCompletePending = false; - if (readStatus == ReadStatus.REQUESTED) { - readStatus = ReadStatus.IN_PROGRESS; - } else { - readStatus = ReadStatus.IDLE; + if (!inReadLoop) { + // While in the read loop we reset the state after calling pipeline methods to prevent StackOverflow. + resetReadStatus(); } allocHandle.readComplete(); diff --git a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexTest.java b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexTest.java index f277dbb12ef1..b1f28b14b94b 100644 --- a/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexTest.java +++ b/codec-http2/src/test/java/io/netty/handler/codec/http2/Http2MultiplexTest.java @@ -58,6 +58,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import static io.netty.handler.codec.http2.Http2Error.NO_ERROR; import static io.netty.handler.codec.http2.Http2TestUtil.anyChannelPromise; import static io.netty.handler.codec.http2.Http2TestUtil.anyHttp2Settings; import static io.netty.handler.codec.http2.Http2TestUtil.assertEqualsAndRelease; @@ -477,6 +478,74 @@ public void channelReadShouldRespectAutoRead() { verifyFramesMultiplexedToCorrectChannel(childChannel, inboundHandler, 2); } + @Test + public void noAutoReadWithReentrantReadDoesNotSOOE() { + final AtomicBoolean shouldRead = new AtomicBoolean(); + Consumer ctxConsumer = new Consumer() { + @Override + public void accept(ChannelHandlerContext obj) { + if (shouldRead.get()) { + obj.read(); + } + } + }; + LastInboundHandler inboundHandler = new LastInboundHandler(ctxConsumer); + AtomicInteger maxReads = new AtomicInteger(1); + Http2StreamChannel childChannel = newInboundStream(3, false, maxReads, inboundHandler); + assertTrue(childChannel.config().isAutoRead()); + Http2HeadersFrame headersFrame = inboundHandler.readInbound(); + assertNotNull(headersFrame); + + childChannel.config().setAutoRead(false); + + final int maxWrites = 10000; // enough writes to generated SOOE. + for (int i = 0; i < maxWrites; ++i) { + frameInboundWriter.writeInboundData(childChannel.stream().id(), bb(String.valueOf(i)), 0, false); + } + frameInboundWriter.writeInboundData(childChannel.stream().id(), bb(String.valueOf(maxWrites)), 0, true); + shouldRead.set(true); + childChannel.read(); + + for (int i = 0; i < maxWrites; ++i) { + Http2DataFrame dataFrame0 = inboundHandler.readInbound(); + assertNotNull(dataFrame0); + release(dataFrame0); + } + Http2DataFrame dataFrame0 = inboundHandler.readInbound(); + assertTrue(dataFrame0.isEndStream()); + release(dataFrame0); + + verifyFramesMultiplexedToCorrectChannel(childChannel, inboundHandler, 0); + } + + @Test + public void readNotRequiredToEndStream() { + LastInboundHandler inboundHandler = new LastInboundHandler(); + AtomicInteger maxReads = new AtomicInteger(1); + Http2StreamChannel childChannel = newInboundStream(3, false, maxReads, inboundHandler); + assertTrue(childChannel.config().isAutoRead()); + + childChannel.config().setAutoRead(false); + + Http2HeadersFrame headersFrame = inboundHandler.readInbound(); + assertNotNull(headersFrame); + + assertNull(inboundHandler.readInbound()); + + frameInboundWriter.writeInboundRstStream(childChannel.stream().id(), NO_ERROR.code()); + + assertFalse(inboundHandler.isChannelActive()); + childChannel.closeFuture().syncUninterruptibly(); + + Http2ResetFrame resetFrame = useUserEventForResetFrame() ? inboundHandler.readUserEvent() : + inboundHandler.readInbound(); + + assertEquals(childChannel.stream(), resetFrame.stream()); + assertEquals(NO_ERROR.code(), resetFrame.errorCode()); + + verifyFramesMultiplexedToCorrectChannel(childChannel, inboundHandler, 0); + } + @Test public void channelReadShouldRespectAutoReadAndNotProduceNPE() throws Exception { LastInboundHandler inboundHandler = new LastInboundHandler(); @@ -1095,7 +1164,7 @@ public void channelReadComplete(ChannelHandlerContext ctx) { childChannel.config().setAutoRead(true); numReads.set(1); - frameInboundWriter.writeInboundRstStream(childChannel.stream().id(), Http2Error.NO_ERROR.code()); + frameInboundWriter.writeInboundRstStream(childChannel.stream().id(), NO_ERROR.code()); // Detecting EOS should flush all pending data regardless of read calls. assertEqualsAndRelease(dataFrame2, inboundHandler.readInbound()); @@ -1111,7 +1180,7 @@ public void channelReadComplete(ChannelHandlerContext ctx) { inboundHandler.readInbound(); assertEquals(childChannel.stream(), resetFrame.stream()); - assertEquals(Http2Error.NO_ERROR.code(), resetFrame.errorCode()); + assertEquals(NO_ERROR.code(), resetFrame.errorCode()); assertNull(inboundHandler.readInbound());