From bd2658601d5fe8b3c255be48ccf9050853312d94 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=CC=88rg=20Prante?= Date: Fri, 17 Dec 2021 16:29:05 +0100 Subject: [PATCH] fix pipelining request release --- gradle.properties | 2 +- .../netty/http/server/api/ServerConfig.java | 6 +++ .../netty/http/server/api/ServerResponse.java | 2 +- .../http/server/DefaultServerConfig.java | 11 ++++ .../org/xbib/netty/http/server/Server.java | 10 ++++ .../http1/Http1ChannelInitializer.java | 22 ++++++-- .../protocol/http1/HttpPipelinedRequest.java | 48 ++++++++++++++++- .../protocol/http1/HttpPipelinedResponse.java | 51 +++++++++++++++++-- .../protocol/http1/HttpPipeliningHandler.java | 26 ++++++++-- .../protocol/http1/HttpServerResponse.java | 2 +- .../protocol/http2/Http2ServerResponse.java | 2 +- .../test/hacks/HttpPipeliningHandlerTest.java | 19 +++---- .../http/server/test/http1/CleartextTest.java | 16 +++--- .../http/server/test/http1/StreamTest.java | 2 +- .../http/server/test/http2/StreamTest.java | 2 +- 15 files changed, 187 insertions(+), 34 deletions(-) diff --git a/gradle.properties b/gradle.properties index 0eeeb97..a674653 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,6 @@ group = org.xbib name = netty-http -version = 4.1.72.0 +version = 4.1.72.1 org.gradle.warning.mode = ALL gradle.wrapper.version = 7.3 diff --git a/netty-http-server-api/src/main/java/org/xbib/netty/http/server/api/ServerConfig.java b/netty-http-server-api/src/main/java/org/xbib/netty/http/server/api/ServerConfig.java index 3bcf6fa..b7a56ec 100644 --- a/netty-http-server-api/src/main/java/org/xbib/netty/http/server/api/ServerConfig.java +++ b/netty-http-server-api/src/main/java/org/xbib/netty/http/server/api/ServerConfig.java @@ -68,6 +68,8 @@ public interface ServerConfig { boolean isDecompressionEnabled(); + boolean isPipeliningEnabled(); + boolean isInstallHttp2Upgrade(); Http2Settings getHttp2Settings(); @@ -200,6 +202,10 @@ public interface ServerConfig { */ int MAX_CONTENT_LENGTH = 256 * 1024 * 1024; + /** + * HTTP/1 pipelining. Enabled by default. + */ + boolean ENABLE_PIPELINING = true; /** * HTTP/1 pipelining capacity. 1024 is very high, it means * 1024 requests can be present for a single client. diff --git a/netty-http-server-api/src/main/java/org/xbib/netty/http/server/api/ServerResponse.java b/netty-http-server-api/src/main/java/org/xbib/netty/http/server/api/ServerResponse.java index ce6c871..1730b6f 100644 --- a/netty-http-server-api/src/main/java/org/xbib/netty/http/server/api/ServerResponse.java +++ b/netty-http-server-api/src/main/java/org/xbib/netty/http/server/api/ServerResponse.java @@ -22,7 +22,7 @@ public interface ServerResponse extends Flushable { Long getResponseId(); - ByteBufOutputStream getOutputStream(); + ByteBufOutputStream newOutputStream(); void flush() throws IOException; diff --git a/netty-http-server/src/main/java/org/xbib/netty/http/server/DefaultServerConfig.java b/netty-http-server/src/main/java/org/xbib/netty/http/server/DefaultServerConfig.java index c961888..58b71ad 100644 --- a/netty-http-server/src/main/java/org/xbib/netty/http/server/DefaultServerConfig.java +++ b/netty-http-server/src/main/java/org/xbib/netty/http/server/DefaultServerConfig.java @@ -56,6 +56,8 @@ public class DefaultServerConfig implements ServerConfig { private int maxContentLength = Defaults.MAX_CONTENT_LENGTH; + private boolean isPipeliningEnabled = Defaults.ENABLE_PIPELINING; + private int pipeliningCapacity = Defaults.PIPELINING_CAPACITY; private int maxCompositeBufferComponents = Defaults.MAX_COMPOSITE_BUFFER_COMPONENTS; @@ -283,6 +285,15 @@ public class DefaultServerConfig implements ServerConfig { return maxContentLength; } + public ServerConfig setPipelining(boolean isPipeliningEnabled) { + this.isPipeliningEnabled = isPipeliningEnabled; + return this; + } + + public boolean isPipeliningEnabled() { + return isPipeliningEnabled; + } + public ServerConfig setPipeliningCapacity(int pipeliningCapacity) { this.pipeliningCapacity = pipeliningCapacity; return this; diff --git a/netty-http-server/src/main/java/org/xbib/netty/http/server/Server.java b/netty-http-server/src/main/java/org/xbib/netty/http/server/Server.java index 92caeaa..216fd78 100644 --- a/netty-http-server/src/main/java/org/xbib/netty/http/server/Server.java +++ b/netty-http-server/src/main/java/org/xbib/netty/http/server/Server.java @@ -662,6 +662,16 @@ public final class Server implements AutoCloseable { return this; } + public Builder enablePipelining(boolean enablePipelining) { + this.serverConfig.setPipelining(enablePipelining); + return this; + } + + public Builder setPipeliningCapacity(int pipeliningCapacity) { + this.serverConfig.setPipeliningCapacity(pipeliningCapacity); + return this; + } + public Builder setInstallHttp2Upgrade(boolean installHttp2Upgrade) { this.serverConfig.setInstallHttp2Upgrade(installHttp2Upgrade); return this; diff --git a/netty-http-server/src/main/java/org/xbib/netty/http/server/protocol/http1/Http1ChannelInitializer.java b/netty-http-server/src/main/java/org/xbib/netty/http/server/protocol/http1/Http1ChannelInitializer.java index 7d7aceb..8f89e1f 100644 --- a/netty-http-server/src/main/java/org/xbib/netty/http/server/protocol/http1/Http1ChannelInitializer.java +++ b/netty-http-server/src/main/java/org/xbib/netty/http/server/protocol/http1/Http1ChannelInitializer.java @@ -108,8 +108,10 @@ public class Http1ChannelInitializer extends ChannelInitializer pipeline.addLast("http-server-ws-handler", serverConfig.getWebSocketFrameHandler()); } - pipeline.addLast("http-server-pipelining", - new HttpPipeliningHandler(serverConfig.getPipeliningCapacity())); + if (serverConfig.isPipeliningEnabled()) { + pipeline.addLast("http-server-pipelining", + new HttpPipeliningHandler(serverConfig.getPipeliningCapacity())); + } pipeline.addLast("http-server-handler", new ServerMessages(server)); pipeline.addLast("http-idle-timeout-handler", @@ -149,8 +151,22 @@ public class Http1ChannelInitializer extends ChannelInitializer ServerTransport transport = server.newTransport(fullHttpRequest.protocolVersion()); transport.requestReceived(ctx, fullHttpRequest, httpPipelinedRequest.getSequenceId()); } - fullHttpRequest.release(); } + if (httpPipelinedRequest.refCnt() > 0) { + httpPipelinedRequest.release(); + } + } else if (msg instanceof FullHttpRequest){ + FullHttpRequest fullHttpRequest = (FullHttpRequest) msg; + if (fullHttpRequest.protocolVersion().majorVersion() == 2) { + // PRI * HTTP/2.0 + DefaultHttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, + HttpResponseStatus.HTTP_VERSION_NOT_SUPPORTED); + ctx.channel().writeAndFlush(response); + } else { + ServerTransport transport = server.newTransport(fullHttpRequest.protocolVersion()); + transport.requestReceived(ctx, fullHttpRequest, 0); + } + fullHttpRequest.release(); } else { super.channelRead(ctx, msg); } diff --git a/netty-http-server/src/main/java/org/xbib/netty/http/server/protocol/http1/HttpPipelinedRequest.java b/netty-http-server/src/main/java/org/xbib/netty/http/server/protocol/http1/HttpPipelinedRequest.java index 2d2b179..3772bba 100644 --- a/netty-http-server/src/main/java/org/xbib/netty/http/server/protocol/http1/HttpPipelinedRequest.java +++ b/netty-http-server/src/main/java/org/xbib/netty/http/server/protocol/http1/HttpPipelinedRequest.java @@ -1,8 +1,11 @@ package org.xbib.netty.http.server.protocol.http1; +import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.LastHttpContent; +import io.netty.util.ReferenceCounted; -public class HttpPipelinedRequest { +public class HttpPipelinedRequest implements ReferenceCounted { private final LastHttpContent request; @@ -13,6 +16,10 @@ public class HttpPipelinedRequest { this.sequenceId = sequenceId; } + public HttpPipelinedResponse createHttpResponse(FullHttpResponse response, ChannelPromise promise) { + return new HttpPipelinedResponse(response, promise, sequenceId); + } + public LastHttpContent getRequest() { return request; } @@ -20,4 +27,43 @@ public class HttpPipelinedRequest { public int getSequenceId() { return sequenceId; } + + @Override + public int refCnt() { + return request.refCnt(); + } + + @Override + public ReferenceCounted retain() { + request.retain(); + return this; + } + + @Override + public ReferenceCounted retain(int increment) { + request.retain(increment); + return this; + } + + @Override + public ReferenceCounted touch() { + request.touch(); + return this; + } + + @Override + public ReferenceCounted touch(Object hint) { + request.touch(hint); + return this; + } + + @Override + public boolean release() { + return request.release(); + } + + @Override + public boolean release(int decrement) { + return request.release(decrement); + } } diff --git a/netty-http-server/src/main/java/org/xbib/netty/http/server/protocol/http1/HttpPipelinedResponse.java b/netty-http-server/src/main/java/org/xbib/netty/http/server/protocol/http1/HttpPipelinedResponse.java index af3aab2..639c537 100644 --- a/netty-http-server/src/main/java/org/xbib/netty/http/server/protocol/http1/HttpPipelinedResponse.java +++ b/netty-http-server/src/main/java/org/xbib/netty/http/server/protocol/http1/HttpPipelinedResponse.java @@ -1,15 +1,19 @@ package org.xbib.netty.http.server.protocol.http1; import io.netty.channel.ChannelPromise; +import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpResponse; +import io.netty.util.ReferenceCounted; -public class HttpPipelinedResponse implements Comparable { +public class HttpPipelinedResponse implements ReferenceCounted, Comparable { + + private final FullHttpResponse response; - private final HttpResponse response; private final ChannelPromise promise; + private final int sequenceId; - public HttpPipelinedResponse(HttpResponse response, ChannelPromise promise, int sequenceId) { + public HttpPipelinedResponse(FullHttpResponse response, ChannelPromise promise, int sequenceId) { this.response = response; this.promise = promise; this.sequenceId = sequenceId; @@ -29,6 +33,45 @@ public class HttpPipelinedResponse implements Comparable @Override public int compareTo(HttpPipelinedResponse other) { - return this.sequenceId - other.sequenceId; + return Integer.compare(this.sequenceId, other.sequenceId); + } + + @Override + public int refCnt() { + return response.refCnt(); + } + + @Override + public ReferenceCounted retain() { + response.retain(); + return this; + } + + @Override + public ReferenceCounted retain(int increment) { + response.retain(increment); + return this; + } + + @Override + public ReferenceCounted touch() { + response.touch(); + return this; + } + + @Override + public ReferenceCounted touch(Object hint) { + response.touch(hint); + return this; + } + + @Override + public boolean release() { + return response.release(); + } + + @Override + public boolean release(int decrement) { + return response.release(decrement); } } diff --git a/netty-http-server/src/main/java/org/xbib/netty/http/server/protocol/http1/HttpPipeliningHandler.java b/netty-http-server/src/main/java/org/xbib/netty/http/server/protocol/http1/HttpPipeliningHandler.java index 67da599..9a25b71 100644 --- a/netty-http-server/src/main/java/org/xbib/netty/http/server/protocol/http1/HttpPipeliningHandler.java +++ b/netty-http-server/src/main/java/org/xbib/netty/http/server/protocol/http1/HttpPipeliningHandler.java @@ -9,6 +9,7 @@ import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.LastHttpContent; +import java.nio.channels.ClosedChannelException; import java.nio.charset.StandardCharsets; import java.util.PriorityQueue; import java.util.Queue; @@ -46,15 +47,17 @@ public class HttpPipeliningHandler extends ChannelDuplexHandler { public HttpPipeliningHandler(int pipelineCapacity) { this.pipelineCapacity = pipelineCapacity; this.lock = new ReentrantLock(); - this.httpPipelinedResponses = new PriorityQueue<>(3); + this.httpPipelinedResponses = new PriorityQueue<>(1); this.requestCounter = new AtomicInteger(); this.writtenRequests = new AtomicInteger(); } @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + public void channelRead(ChannelHandlerContext ctx, Object msg) { if (msg instanceof LastHttpContent) { - super.channelRead(ctx, new HttpPipelinedRequest((LastHttpContent) msg, requestCounter.getAndIncrement())); + ctx.fireChannelRead(new HttpPipelinedRequest((LastHttpContent) msg, requestCounter.getAndIncrement())); + } else { + ctx.fireChannelRead(msg); } } @@ -90,6 +93,23 @@ public class HttpPipeliningHandler extends ChannelDuplexHandler { } } + @Override + public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { + if (!httpPipelinedResponses.isEmpty()) { + ClosedChannelException closedChannelException = new ClosedChannelException(); + HttpPipelinedResponse pipelinedResponse; + while ((pipelinedResponse = httpPipelinedResponses.poll()) != null) { + try { + pipelinedResponse.release(); + pipelinedResponse.getPromise().setFailure(closedChannelException); + } catch (Exception e) { + logger.log(Level.SEVERE, "unexpected error while releasing pipelined http responses", e); + } + } + } + ctx.close(promise); + } + @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { String message = cause.getMessage() == null ? "null" : cause.getMessage(); diff --git a/netty-http-server/src/main/java/org/xbib/netty/http/server/protocol/http1/HttpServerResponse.java b/netty-http-server/src/main/java/org/xbib/netty/http/server/protocol/http1/HttpServerResponse.java index b4b2cb1..38fcdfa 100644 --- a/netty-http-server/src/main/java/org/xbib/netty/http/server/protocol/http1/HttpServerResponse.java +++ b/netty-http-server/src/main/java/org/xbib/netty/http/server/protocol/http1/HttpServerResponse.java @@ -98,7 +98,7 @@ public class HttpServerResponse implements ServerResponse { } @Override - public ByteBufOutputStream getOutputStream() { + public ByteBufOutputStream newOutputStream() { return new ByteBufOutputStream(ctx.alloc().buffer()); } diff --git a/netty-http-server/src/main/java/org/xbib/netty/http/server/protocol/http2/Http2ServerResponse.java b/netty-http-server/src/main/java/org/xbib/netty/http/server/protocol/http2/Http2ServerResponse.java index 6838778..b3f3ff9 100644 --- a/netty-http-server/src/main/java/org/xbib/netty/http/server/protocol/http2/Http2ServerResponse.java +++ b/netty-http-server/src/main/java/org/xbib/netty/http/server/protocol/http2/Http2ServerResponse.java @@ -96,7 +96,7 @@ public class Http2ServerResponse implements ServerResponse { } @Override - public ByteBufOutputStream getOutputStream() { + public ByteBufOutputStream newOutputStream() { return new ByteBufOutputStream(ctx.alloc().buffer()); } diff --git a/netty-http-server/src/test/java/org/xbib/netty/http/server/test/hacks/HttpPipeliningHandlerTest.java b/netty-http-server/src/test/java/org/xbib/netty/http/server/test/hacks/HttpPipeliningHandlerTest.java index cb1bbdd..d4d950a 100644 --- a/netty-http-server/src/test/java/org/xbib/netty/http/server/test/hacks/HttpPipeliningHandlerTest.java +++ b/netty-http-server/src/test/java/org/xbib/netty/http/server/test/hacks/HttpPipeliningHandlerTest.java @@ -19,7 +19,6 @@ import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.codec.http.QueryStringDecoder; import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.extension.ExtendWith; @@ -49,14 +48,13 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -@Disabled /* flaky */ @TestInstance(TestInstance.Lifecycle.PER_CLASS) @ExtendWith(NettyHttpTestExtension.class) class HttpPipeliningHandlerTest { private static final Logger logger = Logger.getLogger(HttpPipeliningHandlerTest.class.getName()); - private static Map waitingRequests = new ConcurrentHashMap<>(); + private static final Map waitingRequests = new ConcurrentHashMap<>(); @AfterAll void closeResources() { @@ -67,7 +65,7 @@ class HttpPipeliningHandlerTest { @Test void testThatPipeliningWorksWithFastSerializedRequests() { - WorkEmulatorHandler handler = new WorkEmulatorHandler(); + WorkEmulatorHandler handler = new WorkEmulatorHandler(Executors.newCachedThreadPool()); EmbeddedChannel embeddedChannel = new EmbeddedChannel(new HttpPipeliningHandler(10000), handler); for (int i = 0; i < 5; i++) { @@ -85,7 +83,7 @@ class HttpPipeliningHandlerTest { @Test void testThatPipeliningWorksWhenSlowRequestsInDifferentOrder() { - WorkEmulatorHandler handler = new WorkEmulatorHandler(); + WorkEmulatorHandler handler = new WorkEmulatorHandler(Executors.newCachedThreadPool()); EmbeddedChannel embeddedChannel = new EmbeddedChannel(new HttpPipeliningHandler(10000), handler); for (int i = 0; i < 5; i++) { @@ -105,7 +103,7 @@ class HttpPipeliningHandlerTest { @Test void testThatPipeliningWorksWithChunkedRequests() { - WorkEmulatorHandler handler = new WorkEmulatorHandler(); + WorkEmulatorHandler handler = new WorkEmulatorHandler(Executors.newCachedThreadPool()); EmbeddedChannel embeddedChannel = new EmbeddedChannel(new AggregateUrisAndHeadersHandler(), new HttpPipeliningHandler(10000), handler); DefaultHttpRequest httpRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/0"); @@ -126,7 +124,7 @@ class HttpPipeliningHandlerTest { @Test void testThatPipeliningClosesConnectionWithTooManyEvents() { assertThrows(ClosedChannelException.class, () -> { - WorkEmulatorHandler handler = new WorkEmulatorHandler(); + WorkEmulatorHandler handler = new WorkEmulatorHandler(Executors.newCachedThreadPool()); EmbeddedChannel embeddedChannel = new EmbeddedChannel(new HttpPipeliningHandler(2), handler); embeddedChannel.writeInbound(createHttpRequest("/0")); @@ -170,7 +168,11 @@ class HttpPipeliningHandlerTest { private static class WorkEmulatorHandler extends SimpleChannelInboundHandler { - private final ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); + private final ExecutorService executorService; + + WorkEmulatorHandler(ExecutorService executorService) { + this.executorService = executorService; + } @Override protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest pipelinedRequest) { @@ -188,7 +190,6 @@ class HttpPipeliningHandlerTest { httpResponse.headers().add(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes()); CountDownLatch latch = new CountDownLatch(1); waitingRequests.put(uri, latch); - // can cause RejectedExecutionException if executorService is too small executorService.submit(() -> { try { latch.await(2, TimeUnit.SECONDS); diff --git a/netty-http-server/src/test/java/org/xbib/netty/http/server/test/http1/CleartextTest.java b/netty-http-server/src/test/java/org/xbib/netty/http/server/test/http1/CleartextTest.java index 2c3bcce..9ebfa29 100644 --- a/netty-http-server/src/test/java/org/xbib/netty/http/server/test/http1/CleartextTest.java +++ b/netty-http-server/src/test/java/org/xbib/netty/http/server/test/http1/CleartextTest.java @@ -44,17 +44,16 @@ class CleartextTest { Client client = Client.builder() .build(); AtomicInteger counter = new AtomicInteger(); - final ResponseListener responseListener = resp -> { - if (resp.getStatus().getCode() == HttpResponseStatus.OK.code()) { - logger.log(Level.INFO, resp.getBodyAsString(StandardCharsets.UTF_8)); - counter.incrementAndGet(); - } - }; try { Request request = Request.get().setVersion(HttpVersion.HTTP_1_1) .url(server.getServerConfig().getAddress().base()) .content("Hello world", "text/plain") - .setResponseListener(responseListener) + .setResponseListener(resp -> { + if (resp.getStatus().getCode() == HttpResponseStatus.OK.code()) { + logger.log(Level.INFO, resp.getBodyAsString(StandardCharsets.UTF_8)); + counter.incrementAndGet(); + } + }) .build(); client.execute(request).get(); } finally { @@ -74,7 +73,8 @@ class CleartextTest { .setContentType("text/plain").build() .write(request.getContent().toString(StandardCharsets.UTF_8))) .build(); - Server server = Server.builder(domain).build(); + Server server = Server.builder(domain) + .build(); server.accept(); Client client = Client.builder() .addPoolNode(httpAddress) diff --git a/netty-http-server/src/test/java/org/xbib/netty/http/server/test/http1/StreamTest.java b/netty-http-server/src/test/java/org/xbib/netty/http/server/test/http1/StreamTest.java index 3762670..5e28026 100644 --- a/netty-http-server/src/test/java/org/xbib/netty/http/server/test/http1/StreamTest.java +++ b/netty-http-server/src/test/java/org/xbib/netty/http/server/test/http1/StreamTest.java @@ -29,7 +29,7 @@ class StreamTest { ByteBufInputStream inputStream = request.getInputStream(); String content = inputStream.readLine(); assertEquals("my body parameter", content); - ByteBufOutputStream outputStream = response.getOutputStream(); + ByteBufOutputStream outputStream = response.newOutputStream(); outputStream.writeBytes("Hello World"); response.getBuilder().setStatus(HttpResponseStatus.OK.code()).setContentType("text/plain").build() .write(outputStream); diff --git a/netty-http-server/src/test/java/org/xbib/netty/http/server/test/http2/StreamTest.java b/netty-http-server/src/test/java/org/xbib/netty/http/server/test/http2/StreamTest.java index d788eef..a6ba902 100644 --- a/netty-http-server/src/test/java/org/xbib/netty/http/server/test/http2/StreamTest.java +++ b/netty-http-server/src/test/java/org/xbib/netty/http/server/test/http2/StreamTest.java @@ -26,7 +26,7 @@ class StreamTest { ByteBufInputStream inputStream = request.getInputStream(); String content = inputStream.readLine(); assertEquals("my body parameter", content); - ByteBufOutputStream outputStream = response.getOutputStream(); + ByteBufOutputStream outputStream = response.newOutputStream(); outputStream.writeBytes("Hello World"); response.getBuilder().setStatus(HttpResponseStatus.OK.code()) .setContentType("text/plain")