fix pipelining request release

This commit is contained in:
Jörg Prante 2021-12-17 16:29:05 +01:00
parent e87e8f7acc
commit bd2658601d
15 changed files with 187 additions and 34 deletions

View file

@ -1,6 +1,6 @@
group = org.xbib
name = netty-http
version = 4.1.72.0
version = 4.1.72.1
org.gradle.warning.mode = ALL
gradle.wrapper.version = 7.3

View file

@ -68,6 +68,8 @@ public interface ServerConfig {
boolean isDecompressionEnabled();
boolean isPipeliningEnabled();
boolean isInstallHttp2Upgrade();
Http2Settings getHttp2Settings();
@ -200,6 +202,10 @@ public interface ServerConfig {
*/
int MAX_CONTENT_LENGTH = 256 * 1024 * 1024;
/**
* HTTP/1 pipelining. Enabled by default.
*/
boolean ENABLE_PIPELINING = true;
/**
* HTTP/1 pipelining capacity. 1024 is very high, it means
* 1024 requests can be present for a single client.

View file

@ -22,7 +22,7 @@ public interface ServerResponse extends Flushable {
Long getResponseId();
ByteBufOutputStream getOutputStream();
ByteBufOutputStream newOutputStream();
void flush() throws IOException;

View file

@ -56,6 +56,8 @@ public class DefaultServerConfig implements ServerConfig {
private int maxContentLength = Defaults.MAX_CONTENT_LENGTH;
private boolean isPipeliningEnabled = Defaults.ENABLE_PIPELINING;
private int pipeliningCapacity = Defaults.PIPELINING_CAPACITY;
private int maxCompositeBufferComponents = Defaults.MAX_COMPOSITE_BUFFER_COMPONENTS;
@ -283,6 +285,15 @@ public class DefaultServerConfig implements ServerConfig {
return maxContentLength;
}
public ServerConfig setPipelining(boolean isPipeliningEnabled) {
this.isPipeliningEnabled = isPipeliningEnabled;
return this;
}
public boolean isPipeliningEnabled() {
return isPipeliningEnabled;
}
public ServerConfig setPipeliningCapacity(int pipeliningCapacity) {
this.pipeliningCapacity = pipeliningCapacity;
return this;

View file

@ -662,6 +662,16 @@ public final class Server implements AutoCloseable {
return this;
}
public Builder enablePipelining(boolean enablePipelining) {
this.serverConfig.setPipelining(enablePipelining);
return this;
}
public Builder setPipeliningCapacity(int pipeliningCapacity) {
this.serverConfig.setPipeliningCapacity(pipeliningCapacity);
return this;
}
public Builder setInstallHttp2Upgrade(boolean installHttp2Upgrade) {
this.serverConfig.setInstallHttp2Upgrade(installHttp2Upgrade);
return this;

View file

@ -108,8 +108,10 @@ public class Http1ChannelInitializer extends ChannelInitializer<Channel>
pipeline.addLast("http-server-ws-handler",
serverConfig.getWebSocketFrameHandler());
}
pipeline.addLast("http-server-pipelining",
new HttpPipeliningHandler(serverConfig.getPipeliningCapacity()));
if (serverConfig.isPipeliningEnabled()) {
pipeline.addLast("http-server-pipelining",
new HttpPipeliningHandler(serverConfig.getPipeliningCapacity()));
}
pipeline.addLast("http-server-handler",
new ServerMessages(server));
pipeline.addLast("http-idle-timeout-handler",
@ -149,8 +151,22 @@ public class Http1ChannelInitializer extends ChannelInitializer<Channel>
ServerTransport transport = server.newTransport(fullHttpRequest.protocolVersion());
transport.requestReceived(ctx, fullHttpRequest, httpPipelinedRequest.getSequenceId());
}
fullHttpRequest.release();
}
if (httpPipelinedRequest.refCnt() > 0) {
httpPipelinedRequest.release();
}
} else if (msg instanceof FullHttpRequest){
FullHttpRequest fullHttpRequest = (FullHttpRequest) msg;
if (fullHttpRequest.protocolVersion().majorVersion() == 2) {
// PRI * HTTP/2.0
DefaultHttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.HTTP_VERSION_NOT_SUPPORTED);
ctx.channel().writeAndFlush(response);
} else {
ServerTransport transport = server.newTransport(fullHttpRequest.protocolVersion());
transport.requestReceived(ctx, fullHttpRequest, 0);
}
fullHttpRequest.release();
} else {
super.channelRead(ctx, msg);
}

View file

@ -1,8 +1,11 @@
package org.xbib.netty.http.server.protocol.http1;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.ReferenceCounted;
public class HttpPipelinedRequest {
public class HttpPipelinedRequest implements ReferenceCounted {
private final LastHttpContent request;
@ -13,6 +16,10 @@ public class HttpPipelinedRequest {
this.sequenceId = sequenceId;
}
public HttpPipelinedResponse createHttpResponse(FullHttpResponse response, ChannelPromise promise) {
return new HttpPipelinedResponse(response, promise, sequenceId);
}
public LastHttpContent getRequest() {
return request;
}
@ -20,4 +27,43 @@ public class HttpPipelinedRequest {
public int getSequenceId() {
return sequenceId;
}
@Override
public int refCnt() {
return request.refCnt();
}
@Override
public ReferenceCounted retain() {
request.retain();
return this;
}
@Override
public ReferenceCounted retain(int increment) {
request.retain(increment);
return this;
}
@Override
public ReferenceCounted touch() {
request.touch();
return this;
}
@Override
public ReferenceCounted touch(Object hint) {
request.touch(hint);
return this;
}
@Override
public boolean release() {
return request.release();
}
@Override
public boolean release(int decrement) {
return request.release(decrement);
}
}

View file

@ -1,15 +1,19 @@
package org.xbib.netty.http.server.protocol.http1;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.util.ReferenceCounted;
public class HttpPipelinedResponse implements Comparable<HttpPipelinedResponse> {
public class HttpPipelinedResponse implements ReferenceCounted, Comparable<HttpPipelinedResponse> {
private final FullHttpResponse response;
private final HttpResponse response;
private final ChannelPromise promise;
private final int sequenceId;
public HttpPipelinedResponse(HttpResponse response, ChannelPromise promise, int sequenceId) {
public HttpPipelinedResponse(FullHttpResponse response, ChannelPromise promise, int sequenceId) {
this.response = response;
this.promise = promise;
this.sequenceId = sequenceId;
@ -29,6 +33,45 @@ public class HttpPipelinedResponse implements Comparable<HttpPipelinedResponse>
@Override
public int compareTo(HttpPipelinedResponse other) {
return this.sequenceId - other.sequenceId;
return Integer.compare(this.sequenceId, other.sequenceId);
}
@Override
public int refCnt() {
return response.refCnt();
}
@Override
public ReferenceCounted retain() {
response.retain();
return this;
}
@Override
public ReferenceCounted retain(int increment) {
response.retain(increment);
return this;
}
@Override
public ReferenceCounted touch() {
response.touch();
return this;
}
@Override
public ReferenceCounted touch(Object hint) {
response.touch(hint);
return this;
}
@Override
public boolean release() {
return response.release();
}
@Override
public boolean release(int decrement) {
return response.release(decrement);
}
}

View file

@ -9,6 +9,7 @@ import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.StandardCharsets;
import java.util.PriorityQueue;
import java.util.Queue;
@ -46,15 +47,17 @@ public class HttpPipeliningHandler extends ChannelDuplexHandler {
public HttpPipeliningHandler(int pipelineCapacity) {
this.pipelineCapacity = pipelineCapacity;
this.lock = new ReentrantLock();
this.httpPipelinedResponses = new PriorityQueue<>(3);
this.httpPipelinedResponses = new PriorityQueue<>(1);
this.requestCounter = new AtomicInteger();
this.writtenRequests = new AtomicInteger();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof LastHttpContent) {
super.channelRead(ctx, new HttpPipelinedRequest((LastHttpContent) msg, requestCounter.getAndIncrement()));
ctx.fireChannelRead(new HttpPipelinedRequest((LastHttpContent) msg, requestCounter.getAndIncrement()));
} else {
ctx.fireChannelRead(msg);
}
}
@ -90,6 +93,23 @@ public class HttpPipeliningHandler extends ChannelDuplexHandler {
}
}
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
if (!httpPipelinedResponses.isEmpty()) {
ClosedChannelException closedChannelException = new ClosedChannelException();
HttpPipelinedResponse pipelinedResponse;
while ((pipelinedResponse = httpPipelinedResponses.poll()) != null) {
try {
pipelinedResponse.release();
pipelinedResponse.getPromise().setFailure(closedChannelException);
} catch (Exception e) {
logger.log(Level.SEVERE, "unexpected error while releasing pipelined http responses", e);
}
}
}
ctx.close(promise);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
String message = cause.getMessage() == null ? "null" : cause.getMessage();

View file

@ -98,7 +98,7 @@ public class HttpServerResponse implements ServerResponse {
}
@Override
public ByteBufOutputStream getOutputStream() {
public ByteBufOutputStream newOutputStream() {
return new ByteBufOutputStream(ctx.alloc().buffer());
}

View file

@ -96,7 +96,7 @@ public class Http2ServerResponse implements ServerResponse {
}
@Override
public ByteBufOutputStream getOutputStream() {
public ByteBufOutputStream newOutputStream() {
return new ByteBufOutputStream(ctx.alloc().buffer());
}

View file

@ -19,7 +19,6 @@ import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.QueryStringDecoder;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.ExtendWith;
@ -49,14 +48,13 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Disabled /* flaky */
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@ExtendWith(NettyHttpTestExtension.class)
class HttpPipeliningHandlerTest {
private static final Logger logger = Logger.getLogger(HttpPipeliningHandlerTest.class.getName());
private static Map<String, CountDownLatch> waitingRequests = new ConcurrentHashMap<>();
private static final Map<String, CountDownLatch> waitingRequests = new ConcurrentHashMap<>();
@AfterAll
void closeResources() {
@ -67,7 +65,7 @@ class HttpPipeliningHandlerTest {
@Test
void testThatPipeliningWorksWithFastSerializedRequests() {
WorkEmulatorHandler handler = new WorkEmulatorHandler();
WorkEmulatorHandler handler = new WorkEmulatorHandler(Executors.newCachedThreadPool());
EmbeddedChannel embeddedChannel = new EmbeddedChannel(new HttpPipeliningHandler(10000),
handler);
for (int i = 0; i < 5; i++) {
@ -85,7 +83,7 @@ class HttpPipeliningHandlerTest {
@Test
void testThatPipeliningWorksWhenSlowRequestsInDifferentOrder() {
WorkEmulatorHandler handler = new WorkEmulatorHandler();
WorkEmulatorHandler handler = new WorkEmulatorHandler(Executors.newCachedThreadPool());
EmbeddedChannel embeddedChannel = new EmbeddedChannel(new HttpPipeliningHandler(10000),
handler);
for (int i = 0; i < 5; i++) {
@ -105,7 +103,7 @@ class HttpPipeliningHandlerTest {
@Test
void testThatPipeliningWorksWithChunkedRequests() {
WorkEmulatorHandler handler = new WorkEmulatorHandler();
WorkEmulatorHandler handler = new WorkEmulatorHandler(Executors.newCachedThreadPool());
EmbeddedChannel embeddedChannel = new EmbeddedChannel(new AggregateUrisAndHeadersHandler(),
new HttpPipeliningHandler(10000), handler);
DefaultHttpRequest httpRequest = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/0");
@ -126,7 +124,7 @@ class HttpPipeliningHandlerTest {
@Test
void testThatPipeliningClosesConnectionWithTooManyEvents() {
assertThrows(ClosedChannelException.class, () -> {
WorkEmulatorHandler handler = new WorkEmulatorHandler();
WorkEmulatorHandler handler = new WorkEmulatorHandler(Executors.newCachedThreadPool());
EmbeddedChannel embeddedChannel = new EmbeddedChannel(new HttpPipeliningHandler(2),
handler);
embeddedChannel.writeInbound(createHttpRequest("/0"));
@ -170,7 +168,11 @@ class HttpPipeliningHandlerTest {
private static class WorkEmulatorHandler extends SimpleChannelInboundHandler<HttpPipelinedRequest> {
private final ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private final ExecutorService executorService;
WorkEmulatorHandler(ExecutorService executorService) {
this.executorService = executorService;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpPipelinedRequest pipelinedRequest) {
@ -188,7 +190,6 @@ class HttpPipeliningHandlerTest {
httpResponse.headers().add(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
CountDownLatch latch = new CountDownLatch(1);
waitingRequests.put(uri, latch);
// can cause RejectedExecutionException if executorService is too small
executorService.submit(() -> {
try {
latch.await(2, TimeUnit.SECONDS);

View file

@ -44,17 +44,16 @@ class CleartextTest {
Client client = Client.builder()
.build();
AtomicInteger counter = new AtomicInteger();
final ResponseListener<HttpResponse> responseListener = resp -> {
if (resp.getStatus().getCode() == HttpResponseStatus.OK.code()) {
logger.log(Level.INFO, resp.getBodyAsString(StandardCharsets.UTF_8));
counter.incrementAndGet();
}
};
try {
Request request = Request.get().setVersion(HttpVersion.HTTP_1_1)
.url(server.getServerConfig().getAddress().base())
.content("Hello world", "text/plain")
.setResponseListener(responseListener)
.setResponseListener(resp -> {
if (resp.getStatus().getCode() == HttpResponseStatus.OK.code()) {
logger.log(Level.INFO, resp.getBodyAsString(StandardCharsets.UTF_8));
counter.incrementAndGet();
}
})
.build();
client.execute(request).get();
} finally {
@ -74,7 +73,8 @@ class CleartextTest {
.setContentType("text/plain").build()
.write(request.getContent().toString(StandardCharsets.UTF_8)))
.build();
Server server = Server.builder(domain).build();
Server server = Server.builder(domain)
.build();
server.accept();
Client client = Client.builder()
.addPoolNode(httpAddress)

View file

@ -29,7 +29,7 @@ class StreamTest {
ByteBufInputStream inputStream = request.getInputStream();
String content = inputStream.readLine();
assertEquals("my body parameter", content);
ByteBufOutputStream outputStream = response.getOutputStream();
ByteBufOutputStream outputStream = response.newOutputStream();
outputStream.writeBytes("Hello World");
response.getBuilder().setStatus(HttpResponseStatus.OK.code()).setContentType("text/plain").build()
.write(outputStream);

View file

@ -26,7 +26,7 @@ class StreamTest {
ByteBufInputStream inputStream = request.getInputStream();
String content = inputStream.readLine();
assertEquals("my body parameter", content);
ByteBufOutputStream outputStream = response.getOutputStream();
ByteBufOutputStream outputStream = response.newOutputStream();
outputStream.writeBytes("Hello World");
response.getBuilder().setStatus(HttpResponseStatus.OK.code())
.setContentType("text/plain")