From 53ab059bb33224fdfb67e2d8578cf5c13e833653 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=CC=88rg=20Prante?= Date: Tue, 1 Oct 2019 09:46:40 +0200 Subject: [PATCH] more feature for form post parameters, chunked upload --- gradle.properties | 4 +- .../xbib/netty/http/client/api/Request.java | 34 ++- .../handler/http/Http1ChannelInitializer.java | 13 +- .../http2/Http2ChannelInitializer.java | 8 +- .../Http2StreamFrameToHttpObjectCodec.java | 225 ----------------- .../http/client/transport/BaseTransport.java | 13 +- .../netty/http/client/transport/Flow.java | 4 +- .../http/client/transport/Http1Transport.java | 40 ++- .../http/client/transport/Http2Transport.java | 5 +- .../http/client/test/http2/GoogleTest.java | 3 +- .../common/mime/MalvaMimeMultipartParser.java | 229 ++++++++++++++++++ .../netty/http/common/mime/MimeMultipart.java | 13 + .../common/mime/MimeMultipartListener.java | 6 + .../http/common/mime/MimeMultipartParser.java | 12 + .../xbib/netty/http/common/mime/MimePart.java | 41 ++++ netty-http-server-reactive/NOTICE.txt | 3 + .../server/reactive/HandlerPublisher.java | 31 +-- .../server/reactive/HandlerSubscriber.java | 11 +- .../reactive/HttpStreamsClientHandler.java | 7 +- .../server/reactive/HttpStreamsHandler.java | 97 ++------ .../reactive/HttpStreamsServerHandler.java | 24 +- .../handler/http/Http1ChannelInitializer.java | 2 +- .../http2/Http2ChannelInitializer.java | 1 + .../Http2StreamFrameToHttpObjectCodec.java | 215 ---------------- .../server/transport/HttpServerRequest.java | 21 +- .../server/test/NettyHttpTestExtension.java | 1 + .../test/hacks/HttpPipeliningHandlerTest.java | 4 +- .../server/test/http1/MimeUploadTest.java | 83 +++++++ .../http/server/test/http1/PostTest.java | 49 ++++ 29 files changed, 578 insertions(+), 621 deletions(-) delete mode 100644 netty-http-client/src/main/java/org/xbib/netty/http/client/handler/http2/Http2StreamFrameToHttpObjectCodec.java create mode 100644 netty-http-common/src/main/java/org/xbib/netty/http/common/mime/MalvaMimeMultipartParser.java create mode 100644 netty-http-common/src/main/java/org/xbib/netty/http/common/mime/MimeMultipart.java create mode 100644 netty-http-common/src/main/java/org/xbib/netty/http/common/mime/MimeMultipartListener.java create mode 100644 netty-http-common/src/main/java/org/xbib/netty/http/common/mime/MimeMultipartParser.java create mode 100644 netty-http-common/src/main/java/org/xbib/netty/http/common/mime/MimePart.java create mode 100644 netty-http-server-reactive/NOTICE.txt delete mode 100644 netty-http-server/src/main/java/org/xbib/netty/http/server/handler/http2/Http2StreamFrameToHttpObjectCodec.java create mode 100644 netty-http-server/src/test/java/org/xbib/netty/http/server/test/http1/MimeUploadTest.java diff --git a/gradle.properties b/gradle.properties index 327e5f5..1e8b5a3 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,13 +1,13 @@ group = org.xbib name = netty-http -version = 4.1.41.1 +version = 4.1.41.2 # netty netty.version = 4.1.41.Final tcnative.version = 2.0.25.Final # for netty-http-common -xbib-net-url.version = 2.0.2 +xbib-net-url.version = 2.0.3 # for netty-http-server bouncycastle.version = 1.62 diff --git a/netty-http-client-api/src/main/java/org/xbib/netty/http/client/api/Request.java b/netty-http-client-api/src/main/java/org/xbib/netty/http/client/api/Request.java index f39e939..2a0f403 100644 --- a/netty-http-client-api/src/main/java/org/xbib/netty/http/client/api/Request.java +++ b/netty-http-client-api/src/main/java/org/xbib/netty/http/client/api/Request.java @@ -13,6 +13,7 @@ import io.netty.handler.codec.http.HttpUtil; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.QueryStringDecoder; import io.netty.handler.codec.http.QueryStringEncoder; +import io.netty.handler.codec.http.multipart.InterfaceHttpData; import io.netty.handler.codec.http2.HttpConversionUtil; import io.netty.util.AsciiString; import org.xbib.net.PercentEncoder; @@ -57,6 +58,8 @@ public final class Request { private final ByteBuf content; + private final List bodyData; + private final long timeoutInMillis; private final boolean followRedirect; @@ -74,7 +77,7 @@ public final class Request { private ResponseListener responseListener; private Request(URL url, String uri, HttpVersion httpVersion, HttpMethod httpMethod, - HttpHeaders headers, Collection cookies, ByteBuf content, + HttpHeaders headers, Collection cookies, ByteBuf content, List bodyData, long timeoutInMillis, boolean followRedirect, int maxRedirect, int redirectCount, boolean isBackOff, BackOff backOff, ResponseListener responseListener) { this.url = url; @@ -84,6 +87,7 @@ public final class Request { this.headers = headers; this.cookies = cookies; this.content = content; + this.bodyData = bodyData; this.timeoutInMillis = timeoutInMillis; this.followRedirect = followRedirect; this.maxRedirects = maxRedirect; @@ -126,6 +130,10 @@ public final class Request { return content; } + public List getBodyData() { + return bodyData; + } + /** * Return the timeout in milliseconds per request. This overrides the read timeout of the client. * @return timeout timeout in milliseconds @@ -306,6 +314,8 @@ public final class Request { private ByteBuf content; + private List bodyData; + private long timeoutInMillis; private boolean followRedirect; @@ -333,6 +343,7 @@ public final class Request { this.removeHeaders = new ArrayList<>(); this.cookies = new HashSet<>(); this.uriParameters = new HttpParameters(); + this.bodyData = new ArrayList<>(); charset(StandardCharsets.UTF_8); } @@ -439,6 +450,13 @@ public final class Request { return this; } + public Builder addRawParameter(String name, String value) { + Objects.requireNonNull(name); + Objects.requireNonNull(value); + uriParameters.add(name, value); + return this; + } + public Builder addFormParameter(String name, String value) { Objects.requireNonNull(name); Objects.requireNonNull(value); @@ -446,6 +464,18 @@ public final class Request { return this; } + public Builder addRawFormParameter(String name, String value) { + Objects.requireNonNull(name); + Objects.requireNonNull(value); + formParameters.add(name, value); + return this; + } + + public Builder addBodyData(InterfaceHttpData data) { + bodyData.add(data); + return this; + } + private String encode(CharSequence contentType, String value) { if (value == null) { return null; @@ -631,7 +661,7 @@ public final class Request { for (String headerName : removeHeaders) { validatedHeaders.remove(headerName); } - return new Request(url, uri, httpVersion, httpMethod, validatedHeaders, cookies, content, + return new Request(url, uri, httpVersion, httpMethod, validatedHeaders, cookies, content, bodyData, timeoutInMillis, followRedirect, maxRedirects, 0, enableBackOff, backOff, responseListener); } diff --git a/netty-http-client/src/main/java/org/xbib/netty/http/client/handler/http/Http1ChannelInitializer.java b/netty-http-client/src/main/java/org/xbib/netty/http/client/handler/http/Http1ChannelInitializer.java index c7c22fa..0bbd343 100644 --- a/netty-http-client/src/main/java/org/xbib/netty/http/client/handler/http/Http1ChannelInitializer.java +++ b/netty-http-client/src/main/java/org/xbib/netty/http/client/handler/http/Http1ChannelInitializer.java @@ -11,6 +11,7 @@ import io.netty.handler.logging.LogLevel; import io.netty.handler.ssl.ApplicationProtocolNames; import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler; import io.netty.handler.ssl.SslHandler; +import io.netty.handler.stream.ChunkedWriteHandler; import org.xbib.netty.http.client.Client; import org.xbib.netty.http.client.ClientConfig; import org.xbib.netty.http.client.api.HttpChannelInitializer; @@ -63,7 +64,7 @@ public class Http1ChannelInitializer extends ChannelInitializer impleme private void configureEncrypted(Channel channel) { ChannelPipeline pipeline = channel.pipeline(); SslHandler sslHandler = sslHandlerFactory.create(); - pipeline.addLast("ssl-handler", sslHandler); + pipeline.addLast("client-ssl-handler", sslHandler); if (clientConfig.isEnableNegotiation()) { ApplicationProtocolNegotiationHandler negotiationHandler = new ApplicationProtocolNegotiationHandler(ApplicationProtocolNames.HTTP_1_1) { @@ -95,15 +96,17 @@ public class Http1ChannelInitializer extends ChannelInitializer impleme private void configureCleartext(Channel channel) { ChannelPipeline pipeline = channel.pipeline(); - pipeline.addLast(new HttpClientCodec(clientConfig.getMaxInitialLineLength(), + //pipeline.addLast("client-chunk-compressor", new HttpChunkContentCompressor(6)); + pipeline.addLast("http-client-chunk-writer", new ChunkedWriteHandler()); + pipeline.addLast("http-client-codec", new HttpClientCodec(clientConfig.getMaxInitialLineLength(), clientConfig.getMaxHeadersSize(), clientConfig.getMaxChunkSize())); if (clientConfig.isEnableGzip()) { - pipeline.addLast(new HttpContentDecompressor()); + pipeline.addLast("http-client-decompressor", new HttpContentDecompressor()); } HttpObjectAggregator httpObjectAggregator = new HttpObjectAggregator(clientConfig.getMaxContentLength(), false); httpObjectAggregator.setMaxCumulationBufferComponents(clientConfig.getMaxCompositeBufferComponents()); - pipeline.addLast(httpObjectAggregator); - pipeline.addLast(httpResponseHandler); + pipeline.addLast("http-client-aggregator", httpObjectAggregator); + pipeline.addLast("http-client-handler", httpResponseHandler); } } diff --git a/netty-http-client/src/main/java/org/xbib/netty/http/client/handler/http2/Http2ChannelInitializer.java b/netty-http-client/src/main/java/org/xbib/netty/http/client/handler/http2/Http2ChannelInitializer.java index 0aa8692..eb30991 100644 --- a/netty-http-client/src/main/java/org/xbib/netty/http/client/handler/http2/Http2ChannelInitializer.java +++ b/netty-http-client/src/main/java/org/xbib/netty/http/client/handler/http2/Http2ChannelInitializer.java @@ -76,12 +76,10 @@ public class Http2ChannelInitializer extends ChannelInitializer impleme Http2MultiplexCodec multiplexCodec = multiplexCodecBuilder.autoAckSettingsFrame(true) .build(); ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("client-multiplex", multiplexCodec); - // does not work - //pipeline.addLast("client-decompressor", new HttpContentDecompressor()); pipeline.addLast("client-messages", new ClientMessages()); } - class ClientMessages extends ChannelInboundHandlerAdapter { + static class ClientMessages extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { @@ -118,9 +116,9 @@ public class Http2ChannelInitializer extends ChannelInitializer impleme } } - class PushPromiseHandler extends Http2FrameLogger { + static class PushPromiseHandler extends Http2FrameLogger { - public PushPromiseHandler(LogLevel level, String name) { + PushPromiseHandler(LogLevel level, String name) { super(level, name); } diff --git a/netty-http-client/src/main/java/org/xbib/netty/http/client/handler/http2/Http2StreamFrameToHttpObjectCodec.java b/netty-http-client/src/main/java/org/xbib/netty/http/client/handler/http2/Http2StreamFrameToHttpObjectCodec.java deleted file mode 100644 index 94b729f..0000000 --- a/netty-http-client/src/main/java/org/xbib/netty/http/client/handler/http2/Http2StreamFrameToHttpObjectCodec.java +++ /dev/null @@ -1,225 +0,0 @@ -package org.xbib.netty.http.client.handler.http2; - -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandler.Sharable; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.EncoderException; -import io.netty.handler.codec.MessageToMessageCodec; -import io.netty.handler.codec.http.DefaultHttpContent; -import io.netty.handler.codec.http.DefaultLastHttpContent; -import io.netty.handler.codec.http.FullHttpMessage; -import io.netty.handler.codec.http.FullHttpResponse; -import io.netty.handler.codec.http.HttpContent; -import io.netty.handler.codec.http.HttpHeaderNames; -import io.netty.handler.codec.http.HttpHeaderValues; -import io.netty.handler.codec.http.HttpMessage; -import io.netty.handler.codec.http.HttpObject; -import io.netty.handler.codec.http.HttpRequest; -import io.netty.handler.codec.http.HttpResponse; -import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.handler.codec.http.HttpScheme; -import io.netty.handler.codec.http.HttpUtil; -import io.netty.handler.codec.http.HttpVersion; -import io.netty.handler.codec.http.LastHttpContent; -import io.netty.handler.codec.http2.DefaultHttp2DataFrame; -import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame; -import io.netty.handler.codec.http2.Http2DataFrame; -import io.netty.handler.codec.http2.Http2Exception; -import io.netty.handler.codec.http2.Http2Headers; -import io.netty.handler.codec.http2.Http2HeadersFrame; -import io.netty.handler.codec.http2.Http2StreamChannel; -import io.netty.handler.codec.http2.Http2StreamFrame; -import io.netty.handler.codec.http2.HttpConversionUtil; -import io.netty.handler.ssl.SslHandler; -import io.netty.util.internal.UnstableApi; - -import java.util.List; - -/** - * This handler converts from {@link Http2StreamFrame} to {@link HttpObject}, - * and back. It can be used as an adapter to make http/2 connections backward-compatible with - * {@link ChannelHandler}s expecting {@link HttpObject}. - * - * For simplicity, it converts to chunked encoding unless the entire stream - * is a single header. - * - * Patched version of original Netty's Http2StreamFrameToHttpObjectCodec. - * This one is using the streamId from {@code frame.stream().id()}. - */ -@UnstableApi -@Sharable -public class Http2StreamFrameToHttpObjectCodec extends MessageToMessageCodec { - - private final boolean isServer; - - private final boolean validateHeaders; - - private HttpScheme scheme; - - public Http2StreamFrameToHttpObjectCodec(final boolean isServer, - final boolean validateHeaders) { - this.isServer = isServer; - this.validateHeaders = validateHeaders; - scheme = HttpScheme.HTTP; - } - - public Http2StreamFrameToHttpObjectCodec(final boolean isServer) { - this(isServer, true); - } - - @Override - public boolean acceptInboundMessage(Object msg) throws Exception { - return (msg instanceof Http2HeadersFrame) || (msg instanceof Http2DataFrame); - } - - @Override - protected void decode(ChannelHandlerContext ctx, Http2StreamFrame frame, List out) throws Exception { - if (frame instanceof Http2HeadersFrame) { - int id = frame.stream() != null ? frame.stream().id() : -1; - Http2HeadersFrame headersFrame = (Http2HeadersFrame) frame; - Http2Headers headers = headersFrame.headers(); - - final CharSequence status = headers.status(); - - // 100-continue response is a special case where Http2HeadersFrame#isEndStream=false - // but we need to decode it as a FullHttpResponse to play nice with HttpObjectAggregator. - if (null != status && HttpResponseStatus.CONTINUE.codeAsText().contentEquals(status)) { - final FullHttpMessage fullMsg = newFullMessage(id, headers, ctx.alloc()); - out.add(fullMsg); - return; - } - - if (headersFrame.isEndStream()) { - if (headers.method() == null && status == null) { - LastHttpContent last = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER, validateHeaders); - HttpConversionUtil.addHttp2ToHttpHeaders(id, headers, last.trailingHeaders(), - HttpVersion.HTTP_1_1, true, true); - out.add(last); - } else { - FullHttpMessage full = newFullMessage(id, headers, ctx.alloc()); - out.add(full); - } - } else { - HttpMessage req = newMessage(id, headers); - if (!HttpUtil.isContentLengthSet(req)) { - req.headers().add(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED); - } - out.add(req); - } - } else if (frame instanceof Http2DataFrame) { - Http2DataFrame dataFrame = (Http2DataFrame) frame; - if (dataFrame.isEndStream()) { - out.add(new DefaultLastHttpContent(dataFrame.content().retain(), validateHeaders)); - } else { - out.add(new DefaultHttpContent(dataFrame.content().retain())); - } - } - } - - private void encodeLastContent(LastHttpContent last, List out) { - boolean needFiller = !(last instanceof FullHttpMessage) && last.trailingHeaders().isEmpty(); - if (last.content().isReadable() || needFiller) { - out.add(new DefaultHttp2DataFrame(last.content().retain(), last.trailingHeaders().isEmpty())); - } - if (!last.trailingHeaders().isEmpty()) { - Http2Headers headers = HttpConversionUtil.toHttp2Headers(last.trailingHeaders(), validateHeaders); - out.add(new DefaultHttp2HeadersFrame(headers, true)); - } - } - - /** - * Encode from an {@link HttpObject} to an {@link Http2StreamFrame}. This method will - * be called for each written message that can be handled by this encoder. - * - * NOTE: 100-Continue responses that are NOT {@link FullHttpResponse} will be rejected. - * - * @param ctx the {@link ChannelHandlerContext} which this handler belongs to - * @param obj the {@link HttpObject} message to encode - * @param out the {@link List} into which the encoded msg should be added - * needs to do some kind of aggregation - * @throws Exception is thrown if an error occurs - */ - @Override - protected void encode(ChannelHandlerContext ctx, HttpObject obj, List out) throws Exception { - // 100-continue is typically a FullHttpResponse, but the decoded - // Http2HeadersFrame should not be marked as endStream=true - if (obj instanceof HttpResponse) { - final HttpResponse res = (HttpResponse) obj; - if (res.status().equals(HttpResponseStatus.CONTINUE)) { - if (res instanceof FullHttpResponse) { - final Http2Headers headers = toHttp2Headers(res); - out.add(new DefaultHttp2HeadersFrame(headers, false)); - return; - } else { - throw new EncoderException( - HttpResponseStatus.CONTINUE.toString() + " must be a FullHttpResponse"); - } - } - } - - if (obj instanceof HttpMessage) { - Http2Headers headers = toHttp2Headers((HttpMessage) obj); - boolean noMoreFrames = false; - if (obj instanceof FullHttpMessage) { - FullHttpMessage full = (FullHttpMessage) obj; - noMoreFrames = !full.content().isReadable() && full.trailingHeaders().isEmpty(); - } - - out.add(new DefaultHttp2HeadersFrame(headers, noMoreFrames)); - } - - if (obj instanceof LastHttpContent) { - LastHttpContent last = (LastHttpContent) obj; - encodeLastContent(last, out); - } else if (obj instanceof HttpContent) { - HttpContent cont = (HttpContent) obj; - out.add(new DefaultHttp2DataFrame(cont.content().retain(), false)); - } - } - - private Http2Headers toHttp2Headers(final HttpMessage msg) { - if (msg instanceof HttpRequest) { - msg.headers().set( - HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), - scheme.name()); - } - - return HttpConversionUtil.toHttp2Headers(msg, validateHeaders); - } - - private HttpMessage newMessage(final int id, - final Http2Headers headers) throws Http2Exception { - return isServer ? - HttpConversionUtil.toHttpRequest(id, headers, validateHeaders) : - HttpConversionUtil.toHttpResponse(id, headers, validateHeaders); - } - - private FullHttpMessage newFullMessage(final int id, - final Http2Headers headers, - final ByteBufAllocator alloc) throws Http2Exception { - return isServer ? - HttpConversionUtil.toFullHttpRequest(id, headers, alloc, validateHeaders) : - HttpConversionUtil.toFullHttpResponse(id, headers, alloc, validateHeaders); - } - - @Override - public void handlerAdded(final ChannelHandlerContext ctx) throws Exception { - super.handlerAdded(ctx); - - // this handler is typically used on an Http2StreamChannel. at this - // stage, ssl handshake should've been established. checking for the - // presence of SslHandler in the parent's channel pipeline to - // determine the HTTP scheme should suffice, even for the case where - // SniHandler is used. - scheme = isSsl(ctx) ? HttpScheme.HTTPS : HttpScheme.HTTP; - } - - protected boolean isSsl(final ChannelHandlerContext ctx) { - final Channel ch = ctx.channel(); - final Channel connChannel = (ch instanceof Http2StreamChannel) ? ch.parent() : ch; - return null != connChannel.pipeline().get(SslHandler.class); - } -} diff --git a/netty-http-client/src/main/java/org/xbib/netty/http/client/transport/BaseTransport.java b/netty-http-client/src/main/java/org/xbib/netty/http/client/transport/BaseTransport.java index ec89462..52a1adf 100644 --- a/netty-http-client/src/main/java/org/xbib/netty/http/client/transport/BaseTransport.java +++ b/netty-http-client/src/main/java/org/xbib/netty/http/client/transport/BaseTransport.java @@ -3,6 +3,8 @@ package org.xbib.netty.http.client.transport; import io.netty.channel.Channel; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpMethod; +import io.netty.handler.codec.http.multipart.DefaultHttpDataFactory; +import io.netty.handler.codec.http.multipart.HttpDataFactory; import io.netty.handler.ssl.SslHandler; import org.xbib.net.PercentDecoder; import org.xbib.net.URL; @@ -58,12 +60,15 @@ public abstract class BaseTransport implements Transport { private CookieBox cookieBox; + protected HttpDataFactory httpDataFactory; + BaseTransport(Client client, HttpAddress httpAddress) { this.client = client; this.httpAddress = httpAddress; this.channels = new ConcurrentHashMap<>(); this.flowMap = new ConcurrentHashMap<>(); this.requests = new ConcurrentSkipListMap<>(); + this.httpDataFactory = new DefaultHttpDataFactory(); } @Override @@ -104,6 +109,7 @@ public abstract class BaseTransport implements Transport { flow.close(); } channels.clear(); + httpDataFactory.cleanAllHttpData(); // do not clear requests } @@ -296,10 +302,7 @@ public abstract class BaseTransport implements Transport { hostAndPort.append(':').append(redirUrl.getPort()); } newHttpRequest.headers().set(HttpHeaderNames.HOST, hostAndPort.toString()); - logger.log(Level.FINE, "redirect url: " + redirUrl + - " old request: " + request.toString() + - " new request: " + newHttpRequest.toString()); - request.release(); + logger.log(Level.FINE, "redirect url: " + redirUrl); return newHttpRequest; } break; @@ -338,7 +341,7 @@ public abstract class BaseTransport implements Transport { if (backOff != null) { long millis = backOff.nextBackOffMillis(); if (millis != BackOff.STOP) { - logger.log(Level.FINE, "status = " + status + " backing off request by " + millis + " milliseconds"); + logger.log(Level.FINE, () -> "status = " + status + " backing off request by " + millis + " milliseconds"); try { Thread.sleep(millis); } catch (InterruptedException e) { diff --git a/netty-http-client/src/main/java/org/xbib/netty/http/client/transport/Flow.java b/netty-http-client/src/main/java/org/xbib/netty/http/client/transport/Flow.java index b23685f..c8e88e4 100644 --- a/netty-http-client/src/main/java/org/xbib/netty/http/client/transport/Flow.java +++ b/netty-http-client/src/main/java/org/xbib/netty/http/client/transport/Flow.java @@ -26,11 +26,11 @@ class Flow { } Integer firstKey() { - return map.firstKey(); + return map.isEmpty() ? null : map.firstKey(); } Integer lastKey() { - return map.lastKey(); + return map.isEmpty() ? null : map.lastKey(); } void put(Integer key, CompletableFuture promise) { diff --git a/netty-http-client/src/main/java/org/xbib/netty/http/client/transport/Http1Transport.java b/netty-http-client/src/main/java/org/xbib/netty/http/client/transport/Http1Transport.java index 8a4f315..aae971b 100644 --- a/netty-http-client/src/main/java/org/xbib/netty/http/client/transport/Http1Transport.java +++ b/netty-http-client/src/main/java/org/xbib/netty/http/client/transport/Http1Transport.java @@ -5,6 +5,7 @@ import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpHeaderNames; +import io.netty.handler.codec.http.multipart.HttpPostRequestEncoder; import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2Settings; import io.netty.handler.codec.http2.HttpConversionUtil; @@ -16,8 +17,6 @@ import org.xbib.netty.http.client.cookie.ClientCookieEncoder; import org.xbib.netty.http.common.DefaultHttpResponse; import org.xbib.netty.http.common.HttpAddress; import org.xbib.netty.http.client.api.Request; -import org.xbib.netty.http.client.api.ResponseListener; -import org.xbib.netty.http.common.HttpResponse; import org.xbib.netty.http.common.cookie.Cookie; import java.io.IOException; @@ -51,6 +50,7 @@ public class Http1Transport extends BaseTransport { FullHttpRequest fullHttpRequest = request.content() == null ? new DefaultFullHttpRequest(request.httpVersion(), request.httpMethod(), uri) : new DefaultFullHttpRequest(request.httpVersion(), request.httpMethod(), uri, request.content()); + HttpPostRequestEncoder httpPostRequestEncoder = null; final Integer streamId = flowMap.get(channelId).nextStreamId(); if (streamId == null) { throw new IllegalStateException(); @@ -68,9 +68,25 @@ public class Http1Transport extends BaseTransport { } // add stream-id and cookie headers fullHttpRequest.headers().set(request.headers()); - // flush after putting request into requests map + if (request.content() == null && !request.getBodyData().isEmpty()) { + try { + httpPostRequestEncoder = + new HttpPostRequestEncoder(httpDataFactory, fullHttpRequest, true); + httpPostRequestEncoder.setBodyHttpDatas(request.getBodyData()); + httpPostRequestEncoder.finalizeRequest(); + } catch (HttpPostRequestEncoder.ErrorDataEncoderException e) { + throw new IOException(e); + } + } if (channel.isWritable()) { - channel.writeAndFlush(fullHttpRequest); + channel.write(fullHttpRequest); + if (httpPostRequestEncoder != null && httpPostRequestEncoder.isChunked()) { + channel.write(httpPostRequestEncoder); + } + channel.flush(); + if (httpPostRequestEncoder != null) { + httpPostRequestEncoder.cleanFiles(); + } client.getRequestCounter().incrementAndGet(); } return this; @@ -119,15 +135,17 @@ public class Http1Transport extends BaseTransport { } catch (URLSyntaxException | IOException e) { logger.log(Level.WARNING, e.getMessage(), e); } - // acknowledge success + // acknowledge success, if possible String channelId = channel.id().toString(); Flow flow = flowMap.get(channelId); - if (flow == null) { - return; - } - CompletableFuture promise = flow.get(flow.lastKey()); - if (promise != null) { - promise.complete(true); + if (flow != null) { + Integer lastKey = flow.lastKey(); + if (lastKey != null) { + CompletableFuture promise = flow.get(lastKey); + if (promise != null) { + promise.complete(true); + } + } } } finally { if (requestKey != null) { diff --git a/netty-http-client/src/main/java/org/xbib/netty/http/client/transport/Http2Transport.java b/netty-http-client/src/main/java/org/xbib/netty/http/client/transport/Http2Transport.java index 7a44ddc..06b8ff1 100644 --- a/netty-http-client/src/main/java/org/xbib/netty/http/client/transport/Http2Transport.java +++ b/netty-http-client/src/main/java/org/xbib/netty/http/client/transport/Http2Transport.java @@ -4,6 +4,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpContentDecompressor; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http2.DefaultHttp2DataFrame; @@ -13,6 +14,7 @@ import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2Settings; import io.netty.handler.codec.http2.Http2StreamChannel; import io.netty.handler.codec.http2.Http2StreamChannelBootstrap; +import io.netty.handler.codec.http2.Http2StreamFrameToHttpObjectCodec; import io.netty.handler.codec.http2.HttpConversionUtil; import io.netty.util.AsciiString; import org.xbib.net.URLSyntaxException; @@ -21,7 +23,6 @@ import org.xbib.netty.http.client.api.Transport; import org.xbib.netty.http.client.cookie.ClientCookieDecoder; import org.xbib.netty.http.client.cookie.ClientCookieEncoder; import org.xbib.netty.http.client.handler.http2.Http2ResponseHandler; -import org.xbib.netty.http.client.handler.http2.Http2StreamFrameToHttpObjectCodec; import org.xbib.netty.http.common.DefaultHttpResponse; import org.xbib.netty.http.common.HttpAddress; import org.xbib.netty.http.client.api.Request; @@ -56,6 +57,8 @@ public class Http2Transport extends BaseTransport { ChannelPipeline p = ch.pipeline(); p.addLast("child-client-frame-converter", new Http2StreamFrameToHttpObjectCodec(false)); + p.addLast("child-client-decompressor", + new HttpContentDecompressor()); p.addLast("child-client-chunk-aggregator", new HttpObjectAggregator(client.getClientConfig().getMaxContentLength())); p.addLast("child-client-response-handler", diff --git a/netty-http-client/src/test/java/org/xbib/netty/http/client/test/http2/GoogleTest.java b/netty-http-client/src/test/java/org/xbib/netty/http/client/test/http2/GoogleTest.java index cf3636f..b7fbbd0 100644 --- a/netty-http-client/src/test/java/org/xbib/netty/http/client/test/http2/GoogleTest.java +++ b/netty-http-client/src/test/java/org/xbib/netty/http/client/test/http2/GoogleTest.java @@ -17,7 +17,8 @@ public class GoogleTest { .build(); try { // TODO decompression of frames - Request request2 = Request.get().url("https://google.com").setVersion("HTTP/2.0") + Request request2 = Request.get().url("https://google.com") + .setVersion("HTTP/2.0") .setResponseListener(resp -> logger.log(Level.INFO, "got HTTP/2 response: " + resp.getHeaders() + resp.getBodyAsString(StandardCharsets.UTF_8))) .build(); diff --git a/netty-http-common/src/main/java/org/xbib/netty/http/common/mime/MalvaMimeMultipartParser.java b/netty-http-common/src/main/java/org/xbib/netty/http/common/mime/MalvaMimeMultipartParser.java new file mode 100644 index 0000000..1023d60 --- /dev/null +++ b/netty-http-common/src/main/java/org/xbib/netty/http/common/mime/MalvaMimeMultipartParser.java @@ -0,0 +1,229 @@ +package org.xbib.netty.http.common.mime; + +import io.netty.buffer.ByteBuf; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.LinkedHashMap; +import java.util.Locale; +import java.util.Map; + +/** + * A MIME multi part message parser (RFC 2046). + */ +public class MalvaMimeMultipartParser implements MimeMultipartParser { + + private String contentType; + + private byte[] boundary; + + private ByteBuf payload; + + private String type; + + private String subType; + + public MalvaMimeMultipartParser(String contentType, ByteBuf payload) { + this.contentType = contentType; + this.payload = payload; + if (contentType != null) { + int pos = contentType.indexOf(';'); + this.type = pos >= 0 ? contentType.substring(0, pos) : contentType; + this.type = type.trim().toLowerCase(); + this.subType = type.startsWith("multipart") ? type.substring(10).trim() : null; + Map m = parseHeaderLine(contentType); + this.boundary = m.containsKey("boundary") ? m.get("boundary").toString().getBytes(StandardCharsets.US_ASCII) : null; + } + } + + @Override + public String type() { + return type; + } + + @Override + public String subType() { + return subType; + } + + @Override + public void parse(MimeMultipartListener listener) throws IOException { + if (boundary == null) { + return; + } + // Assumption: header is in 8 bytes (ISO-8859-1). Convert to Unicode. + StringBuilder sb = new StringBuilder(); + boolean inHeader = true; + boolean inBody = false; + Integer start = null; + Map headers = new LinkedHashMap<>(); + int eol = 0; + byte[] payloadBytes = payload.array(); + for (int i = 0; i < payloadBytes.length; i++) { + byte b = payloadBytes[i]; + if (inHeader) { + switch (b) { + case '\r': + break; + case '\n': + if (sb.length() > 0) { + String[] s = sb.toString().split(":"); + String k = s[0]; + String v = s[1]; + if (!k.startsWith("--")) { + headers.put(k.toLowerCase(Locale.ROOT), v.trim()); + } + eol = 0; + sb.setLength(0); + } else { + eol++; + if (eol >= 1) { + eol = 0; + sb.setLength(0); + inHeader = false; + inBody = true; + } + } + break; + default: + eol = 0; + sb.append(b); + break; + } + } + if (inBody) { + int len = headers.containsKey("content-length") ? + Integer.parseInt(headers.get("content-length")) : -1; + if (len > 0) { + inBody = false; + inHeader = true; + } else { + if (start == null) { + if (b != '\r' && b != '\n') { + start = i; + } + } + if (start != null) { + i = indexOf(payloadBytes, boundary, start, payloadBytes.length); + if (i == -1) { + throw new IOException("boundary not found"); + } + int l = i - start; + if (l > 4) { + l = l - 4; + } + //BytesReference body = new BytesArray(payloadBytes, start, l) + ByteBuf body = payload.retainedSlice(start, l); + Map m = new LinkedHashMap<>(); + for (Map.Entry entry : headers.entrySet()) { + m.putAll(parseHeaderLine(entry.getValue())); + } + headers.putAll(m); + if (listener != null) { + listener.handle(type, subType, new MimePart(headers, body)); + } + inBody = false; + inHeader = true; + headers = new LinkedHashMap<>(); + start = null; + eol = -1; + } + } + } + } + } + + private Map parseHeaderLine(String line) { + Map params = new LinkedHashMap<>(); + int pos = line.indexOf(";"); + String spec = line.substring(pos + 1); + if (pos < 0) { + return params; + } + String key = ""; + String value; + boolean inKey = true; + boolean inString = false; + int start = 0; + int i; + for (i = 0; i < spec.length(); i++) { + switch (spec.charAt(i)) { + case '=': + if (inKey) { + key = spec.substring(start, i).trim().toLowerCase(); + start = i + 1; + inKey = false; + } else if (!inString) { + throw new IllegalArgumentException(contentType + " value has illegal character '=' at " + i + ": " + spec); + } + break; + case ';': + if (inKey) { + if (spec.substring(start, i).trim().length() > 0) { + throw new IllegalArgumentException(contentType + " parameter missing value at " + i + ": " + spec); + } else { + throw new IllegalArgumentException(contentType + " parameter key has illegal character ';' at " + i + ": " + spec); + } + } else if (!inString) { + value = spec.substring(start, i).trim(); + params.put(key, value); + key = null; + start = i + 1; + inKey = true; + } + break; + case '"': + if (inKey) { + throw new IllegalArgumentException(contentType + " key has illegal character '\"' at " + i + ": " + spec); + } else if (inString) { + value = spec.substring(start, i).trim(); + params.put(key, value); + key = null; + for (i++; i < spec.length() && spec.charAt(i) != ';'; i++) { + if (!Character.isWhitespace(spec.charAt(i))) { + throw new IllegalArgumentException(contentType + " value has garbage after quoted string at " + i + ": " + spec); + } + } + start = i + 1; + inString = false; + inKey = true; + } else { + if (spec.substring(start, i).trim().length() > 0) { + throw new IllegalArgumentException(contentType + " value has garbage before quoted string at " + i + ": " + spec); + } + start = i + 1; + inString = true; + } + break; + } + } + if (inKey) { + if (pos > start && spec.substring(start, i).trim().length() > 0) { + throw new IllegalArgumentException(contentType + " missing value at " + i + ": " + spec); + } + } else if (!inString) { + value = spec.substring(start, i).trim(); + params.put(key, value); + } else { + throw new IllegalArgumentException(contentType + " has an unterminated quoted string: " + spec); + } + return params; + } + + private static int indexOf(byte[] array, byte[] target, int start, int end) { + if (target.length == 0) { + return 0; + } + outer: + for (int i = start; i < end - target.length + 1; i++) { + for (int j = 0; j < target.length; j++) { + if (array[i + j] != target[j]) { + continue outer; + } + } + return i; + } + return -1; + } +} + + diff --git a/netty-http-common/src/main/java/org/xbib/netty/http/common/mime/MimeMultipart.java b/netty-http-common/src/main/java/org/xbib/netty/http/common/mime/MimeMultipart.java new file mode 100644 index 0000000..11d7452 --- /dev/null +++ b/netty-http-common/src/main/java/org/xbib/netty/http/common/mime/MimeMultipart.java @@ -0,0 +1,13 @@ +package org.xbib.netty.http.common.mime; + +import io.netty.buffer.ByteBuf; +import java.util.Map; + +public interface MimeMultipart { + + Map headers(); + + ByteBuf body(); + + int length(); +} diff --git a/netty-http-common/src/main/java/org/xbib/netty/http/common/mime/MimeMultipartListener.java b/netty-http-common/src/main/java/org/xbib/netty/http/common/mime/MimeMultipartListener.java new file mode 100644 index 0000000..cbf4b55 --- /dev/null +++ b/netty-http-common/src/main/java/org/xbib/netty/http/common/mime/MimeMultipartListener.java @@ -0,0 +1,6 @@ +package org.xbib.netty.http.common.mime; + +public interface MimeMultipartListener { + + void handle(String type, String subtype, MimeMultipart part); +} diff --git a/netty-http-common/src/main/java/org/xbib/netty/http/common/mime/MimeMultipartParser.java b/netty-http-common/src/main/java/org/xbib/netty/http/common/mime/MimeMultipartParser.java new file mode 100644 index 0000000..027ec60 --- /dev/null +++ b/netty-http-common/src/main/java/org/xbib/netty/http/common/mime/MimeMultipartParser.java @@ -0,0 +1,12 @@ +package org.xbib.netty.http.common.mime; + +import java.io.IOException; + +public interface MimeMultipartParser { + + String type(); + + String subType(); + + void parse(MimeMultipartListener listener) throws IOException; +} diff --git a/netty-http-common/src/main/java/org/xbib/netty/http/common/mime/MimePart.java b/netty-http-common/src/main/java/org/xbib/netty/http/common/mime/MimePart.java new file mode 100644 index 0000000..ae82fef --- /dev/null +++ b/netty-http-common/src/main/java/org/xbib/netty/http/common/mime/MimePart.java @@ -0,0 +1,41 @@ +package org.xbib.netty.http.common.mime; + +import io.netty.buffer.ByteBuf; +import java.nio.charset.StandardCharsets; +import java.util.Map; + +public class MimePart implements MimeMultipart { + + Map headers; + + ByteBuf body; + + int length; + + MimePart(Map headers, ByteBuf body) { + this.headers = headers; + this.body = body; + this.length = body.readableBytes(); + } + + @Override + public Map headers() { + return headers; + } + + @Override + public ByteBuf body() { + return body; + } + + @Override + public int length() { + return length; + } + + @Override + public String toString() { + String b = body != null ? body.toString(StandardCharsets.UTF_8) : ""; + return "headers=" + headers + " length=" + length + " body=" + b; + } +} diff --git a/netty-http-server-reactive/NOTICE.txt b/netty-http-server-reactive/NOTICE.txt new file mode 100644 index 0000000..a292203 --- /dev/null +++ b/netty-http-server-reactive/NOTICE.txt @@ -0,0 +1,3 @@ +This work is based on + +https://github.com/playframework/netty-reactive-streams/ diff --git a/netty-http-server-reactive/src/main/java/org/xbib/netty/http/server/reactive/HandlerPublisher.java b/netty-http-server-reactive/src/main/java/org/xbib/netty/http/server/reactive/HandlerPublisher.java index abbd0f7..03a3ec5 100644 --- a/netty-http-server-reactive/src/main/java/org/xbib/netty/http/server/reactive/HandlerPublisher.java +++ b/netty-http-server-reactive/src/main/java/org/xbib/netty/http/server/reactive/HandlerPublisher.java @@ -166,9 +166,7 @@ public class HandlerPublisher extends ChannelDuplexHandler implements Publish if (subscriber == null) { throw new NullPointerException("Null subscriber"); } - if (!hasSubscriber.compareAndSet(false, true)) { - // Must call onSubscribe first. subscriber.onSubscribe(new Subscription() { @Override public void request(long n) { @@ -179,12 +177,7 @@ public class HandlerPublisher extends ChannelDuplexHandler implements Publish }); subscriber.onError(new IllegalStateException("This publisher only supports one subscriber")); } else { - executor.execute(new Runnable() { - @Override - public void run() { - provideSubscriber(subscriber); - } - }); + executor.execute(() -> provideSubscriber(subscriber)); } } @@ -216,8 +209,6 @@ public class HandlerPublisher extends ChannelDuplexHandler implements Publish @Override public void handlerAdded(ChannelHandlerContext ctx) { - // If the channel is not yet registered, then it's not safe to invoke any methods on it, eg read() or close() - // So don't provide the context until it is registered. if (ctx.channel().isRegistered()) { provideChannelContext(ctx); } @@ -234,7 +225,6 @@ public class HandlerPublisher extends ChannelDuplexHandler implements Publish case NO_SUBSCRIBER_OR_CONTEXT: verifyRegisteredWithRightExecutor(ctx); this.ctx = ctx; - // It's set, we don't have a subscriber state = NO_SUBSCRIBER; break; case NO_CONTEXT: @@ -244,7 +234,7 @@ public class HandlerPublisher extends ChannelDuplexHandler implements Publish subscriber.onSubscribe(new ChannelSubscription()); break; default: - // Ignore, this could be invoked twice by both handlerAdded and channelRegistered. + break; } } @@ -256,7 +246,6 @@ public class HandlerPublisher extends ChannelDuplexHandler implements Publish @Override public void channelActive(ChannelHandlerContext ctx) { - // If we subscribed before the channel was active, then our read would have been ignored. if (state == DEMANDING) { requestDemand(); } @@ -278,19 +267,16 @@ public class HandlerPublisher extends ChannelDuplexHandler implements Publish case IDLE: if (addDemand(demand)) { - // Important to change state to demanding before doing a read, in case we get a synchronous - // read back. state = DEMANDING; requestDemand(); } break; default: - + break; } } private boolean addDemand(long demand) { - if (demand <= 0) { illegalDemand(); return false; @@ -320,7 +306,7 @@ public class HandlerPublisher extends ChannelDuplexHandler implements Publish if (outstandingDemand > 0) { if (state == BUFFERING) { state = DEMANDING; - } // otherwise we're draining + } requestDemand(); } else if (state == BUFFERING) { state = IDLE; @@ -409,7 +395,6 @@ public class HandlerPublisher extends ChannelDuplexHandler implements Publish } private void complete() { - switch (state) { case NO_SUBSCRIBER: case BUFFERING: @@ -422,8 +407,8 @@ public class HandlerPublisher extends ChannelDuplexHandler implements Publish state = DONE; break; case NO_SUBSCRIBER_ERROR: - // Ignore, we're already going to complete the stream with an error - // when the subscriber subscribes. + break; + default: break; } } @@ -444,6 +429,8 @@ public class HandlerPublisher extends ChannelDuplexHandler implements Publish cleanup(); subscriber.onError(cause); break; + default: + break; } } @@ -464,7 +451,7 @@ public class HandlerPublisher extends ChannelDuplexHandler implements Publish @Override public void cancel() { - executor.execute(() -> receivedCancel()); + executor.execute(HandlerPublisher.this::receivedCancel); } } diff --git a/netty-http-server-reactive/src/main/java/org/xbib/netty/http/server/reactive/HandlerSubscriber.java b/netty-http-server-reactive/src/main/java/org/xbib/netty/http/server/reactive/HandlerSubscriber.java index 51a4575..12a5697 100644 --- a/netty-http-server-reactive/src/main/java/org/xbib/netty/http/server/reactive/HandlerSubscriber.java +++ b/netty-http-server-reactive/src/main/java/org/xbib/netty/http/server/reactive/HandlerSubscriber.java @@ -103,16 +103,13 @@ public class HandlerSubscriber extends ChannelDuplexHandler implements Subscr switch (state) { case NO_SUBSCRIPTION_OR_CONTEXT: this.ctx = ctx; - // We were in no subscription or context, now we just don't have a subscription. state = NO_SUBSCRIPTION; break; case NO_CONTEXT: this.ctx = ctx; - // We were in no context, we're now fully initialised maybeStart(); break; case COMPLETE: - // We are complete, close state = COMPLETE; ctx.close(); break; @@ -175,6 +172,8 @@ public class HandlerSubscriber extends ChannelDuplexHandler implements Subscr subscription.cancel(); state = CANCELLED; break; + default: + break; } } @@ -201,6 +200,8 @@ public class HandlerSubscriber extends ChannelDuplexHandler implements Subscr case CANCELLED: subscription.cancel(); break; + default: + break; } } @@ -248,6 +249,9 @@ public class HandlerSubscriber extends ChannelDuplexHandler implements Subscr ctx.close(); state = COMPLETE; break; + default: + break; + } }); } @@ -255,7 +259,6 @@ public class HandlerSubscriber extends ChannelDuplexHandler implements Subscr private void maybeRequestMore() { if (outstandingDemand <= demandLowWatermark && ctx.channel().isWritable()) { long toRequest = demandHighWatermark - outstandingDemand; - outstandingDemand = demandHighWatermark; subscription.request(toRequest); } diff --git a/netty-http-server-reactive/src/main/java/org/xbib/netty/http/server/reactive/HttpStreamsClientHandler.java b/netty-http-server-reactive/src/main/java/org/xbib/netty/http/server/reactive/HttpStreamsClientHandler.java index 51ecbfd..0b9a66b 100644 --- a/netty-http-server-reactive/src/main/java/org/xbib/netty/http/server/reactive/HttpStreamsClientHandler.java +++ b/netty-http-server-reactive/src/main/java/org/xbib/netty/http/server/reactive/HttpStreamsClientHandler.java @@ -51,21 +51,16 @@ public class HttpStreamsClientHandler extends HttpStreamsHandler= 100 && response.status().code() < 200) { return false; } - if (response.status().equals(HttpResponseStatus.NO_CONTENT) || response.status().equals(HttpResponseStatus.NOT_MODIFIED)) { return false; } - if (HttpUtil.isTransferEncodingChunked(response)) { return true; } - - if (HttpUtil.isContentLengthSet(response)) { return HttpUtil.getContentLength(response) > 0; } - return true; } @@ -132,7 +127,7 @@ public class HttpStreamsClientHandler extends HttpStreamsHandler()); + awaiting100ContinueMessage.subscribe(new CancelledSubscriber<>()); awaiting100ContinueMessage = null; awaiting100Continue.onSubscribe(new Subscription() { public void request(long n) { diff --git a/netty-http-server-reactive/src/main/java/org/xbib/netty/http/server/reactive/HttpStreamsHandler.java b/netty-http-server-reactive/src/main/java/org/xbib/netty/http/server/reactive/HttpStreamsHandler.java index 19769f3..57b70d4 100644 --- a/netty-http-server-reactive/src/main/java/org/xbib/netty/http/server/reactive/HttpStreamsHandler.java +++ b/netty-http-server-reactive/src/main/java/org/xbib/netty/http/server/reactive/HttpStreamsHandler.java @@ -1,7 +1,6 @@ package org.xbib.netty.http.server.reactive; import io.netty.channel.ChannelDuplexHandler; -import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; @@ -136,43 +135,25 @@ abstract class HttpStreamsHandler publisher = new HandlerPublisher(ctx.executor(), HttpContent.class) { + HandlerPublisher publisher = new HandlerPublisher<>(ctx.executor(), HttpContent.class) { @Override protected void cancelled() { if (ctx.executor().inEventLoop()) { handleCancelled(ctx, inMsg); } else { - ctx.executor().execute(new Runnable() { - @Override - public void run() { - handleCancelled(ctx, inMsg); - } - }); + ctx.executor().execute(() -> handleCancelled(ctx, inMsg)); } } @@ -182,7 +163,6 @@ abstract class HttpStreamsHandler 0 || !((LastHttpContent) content).trailingHeaders().isEmpty()) { - // It has data or trailing headers, send them ctx.fireChannelRead(content); } else { ReferenceCountUtil.release(content); } - removeHandlerIfActive(ctx, ctx.name() + "-body-publisher"); currentlyStreamedMessage = null; consumedInMessage(ctx); - } else { ctx.fireChannelRead(content); } - } else { ReferenceCountUtil.release(content); if (content instanceof LastHttpContent) { @@ -232,7 +206,7 @@ abstract class HttpStreamsHandler + executeInEventLoop(ctx, () -> { + sentOutMessage(ctx); + outgoing.remove(); + flushNext(ctx); + })); } else if (out.message instanceof StreamedHttpMessage) { - StreamedHttpMessage streamed = (StreamedHttpMessage) out.message; - HandlerSubscriber subscriber = new HandlerSubscriber(ctx.executor()) { + HandlerSubscriber subscriber = new HandlerSubscriber<>(ctx.executor()) { @Override protected void error(Throwable error) { out.promise.tryFailure(error); @@ -295,45 +253,26 @@ abstract class HttpStreamsHandler completeBody(ctx)); } }; - sendLastHttpContent = true; - - // DON'T pass the promise through, create a new promise instead. ctx.writeAndFlush(out.message); - ctx.pipeline().addAfter(ctx.name(), ctx.name() + "-body-subscriber", subscriber); subscribeSubscriberToStream(streamed, subscriber); } - } private void completeBody(final ChannelHandlerContext ctx) { removeHandlerIfActive(ctx, ctx.name() + "-body-subscriber"); - if (sendLastHttpContent) { ChannelPromise promise = outgoing.peek().promise; ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT, promise).addListener( - new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture channelFuture) throws Exception { - executeInEventLoop(ctx, new Runnable() { - @Override - public void run() { - outgoing.remove(); - sentOutMessage(ctx); - flushNext(ctx); - } - }); - } - } + (ChannelFutureListener) channelFuture -> executeInEventLoop(ctx, () -> { + outgoing.remove(); + sentOutMessage(ctx); + flushNext(ctx); + }) ); } else { outgoing.remove().promise.setSuccess(); @@ -374,7 +313,7 @@ abstract class HttpStreamsHandler dependentHandlers; public HttpStreamsServerHandler() { - this(Collections.emptyList()); + this(Collections.emptyList()); } /** @@ -89,11 +89,8 @@ public class HttpStreamsServerHandler extends HttpStreamsHandler.Outgoing out) { - if (out.message instanceof WebSocketHttpResponse) { if ((lastRequest instanceof FullHttpRequest) || !hasBody(lastRequest)) { handleWebSocketResponse(ctx, out); } else { - // If the response has a streamed body, then we can't send the WebSocket response until we've received - // the body. webSocketResponse = out; } } else { @@ -150,8 +143,6 @@ public class HttpStreamsServerHandler extends HttpStreamsHandler()); } else { - // First, insert new handlers in the chain after us for handling the websocket ChannelPipeline pipeline = ctx.pipeline(); HandlerPublisher publisher = new HandlerPublisher<>(ctx.executor(), WebSocketFrame.class); HandlerSubscriber subscriber = new HandlerSubscriber<>(ctx.executor()); pipeline.addAfter(ctx.executor(), ctx.name(), "websocket-subscriber", subscriber); pipeline.addAfter(ctx.executor(), ctx.name(), "websocket-publisher", publisher); - - // Now remove ourselves from the chain ctx.pipeline().remove(ctx.name()); - - // Now do the handshake - // Wrap the request in an empty request because we don't need the WebSocket handshaker ignoring the body, - // we already have handled the body. handshaker.handshake(ctx.channel(), new EmptyHttpRequest(lastRequest)); - - // And hook up the subscriber/publishers response.subscribe(subscriber); publisher.subscribe(response); } - } @Override diff --git a/netty-http-server/src/main/java/org/xbib/netty/http/server/handler/http/Http1ChannelInitializer.java b/netty-http-server/src/main/java/org/xbib/netty/http/server/handler/http/Http1ChannelInitializer.java index dda298b..c0892bb 100644 --- a/netty-http-server/src/main/java/org/xbib/netty/http/server/handler/http/Http1ChannelInitializer.java +++ b/netty-http-server/src/main/java/org/xbib/netty/http/server/handler/http/Http1ChannelInitializer.java @@ -79,6 +79,7 @@ public class Http1ChannelInitializer extends ChannelInitializer private void configureCleartext(Channel channel) { ChannelPipeline pipeline = channel.pipeline(); + pipeline.addLast("http-server-chunked-write", new ChunkedWriteHandler()); pipeline.addLast("http-server-codec", new HttpServerCodec(serverConfig.getMaxInitialLineLength(), serverConfig.getMaxHeadersSize(), serverConfig.getMaxChunkSize())); @@ -96,7 +97,6 @@ public class Http1ChannelInitializer extends ChannelInitializer httpObjectAggregator.setMaxCumulationBufferComponents(serverConfig.getMaxCompositeBufferComponents()); pipeline.addLast("http-server-aggregator", httpObjectAggregator); pipeline.addLast("http-server-pipelining", new HttpPipeliningHandler(serverConfig.getPipeliningCapacity())); - pipeline.addLast("http-server-chunked-write", new ChunkedWriteHandler()); pipeline.addLast("http-server-handler", new HttpHandler(server)); pipeline.addLast("http-idle-timeout-handler", new IdleTimeoutHandler(serverConfig.getIdleTimeoutMillis())); } diff --git a/netty-http-server/src/main/java/org/xbib/netty/http/server/handler/http2/Http2ChannelInitializer.java b/netty-http-server/src/main/java/org/xbib/netty/http/server/handler/http2/Http2ChannelInitializer.java index 314c69a..d0f63ac 100644 --- a/netty-http-server/src/main/java/org/xbib/netty/http/server/handler/http2/Http2ChannelInitializer.java +++ b/netty-http-server/src/main/java/org/xbib/netty/http/server/handler/http2/Http2ChannelInitializer.java @@ -21,6 +21,7 @@ import io.netty.handler.codec.http2.Http2MultiplexCodec; import io.netty.handler.codec.http2.Http2MultiplexCodecBuilder; import io.netty.handler.codec.http2.Http2ServerUpgradeCodec; import io.netty.handler.codec.http2.Http2Settings; +import io.netty.handler.codec.http2.Http2StreamFrameToHttpObjectCodec; import io.netty.handler.logging.LogLevel; import io.netty.handler.ssl.SslContext; import io.netty.handler.stream.ChunkedWriteHandler; diff --git a/netty-http-server/src/main/java/org/xbib/netty/http/server/handler/http2/Http2StreamFrameToHttpObjectCodec.java b/netty-http-server/src/main/java/org/xbib/netty/http/server/handler/http2/Http2StreamFrameToHttpObjectCodec.java deleted file mode 100644 index 146ae35..0000000 --- a/netty-http-server/src/main/java/org/xbib/netty/http/server/handler/http2/Http2StreamFrameToHttpObjectCodec.java +++ /dev/null @@ -1,215 +0,0 @@ -package org.xbib.netty.http.server.handler.http2; - -import io.netty.buffer.ByteBufAllocator; -import io.netty.buffer.Unpooled; -import io.netty.channel.Channel; -import io.netty.channel.ChannelHandler; -import io.netty.channel.ChannelHandler.Sharable; -import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.EncoderException; -import io.netty.handler.codec.MessageToMessageCodec; -import io.netty.handler.codec.http.DefaultHttpContent; -import io.netty.handler.codec.http.DefaultLastHttpContent; -import io.netty.handler.codec.http.FullHttpMessage; -import io.netty.handler.codec.http.FullHttpResponse; -import io.netty.handler.codec.http.HttpContent; -import io.netty.handler.codec.http.HttpHeaderNames; -import io.netty.handler.codec.http.HttpHeaderValues; -import io.netty.handler.codec.http.HttpMessage; -import io.netty.handler.codec.http.HttpObject; -import io.netty.handler.codec.http.HttpRequest; -import io.netty.handler.codec.http.HttpResponse; -import io.netty.handler.codec.http.HttpResponseStatus; -import io.netty.handler.codec.http.HttpScheme; -import io.netty.handler.codec.http.HttpUtil; -import io.netty.handler.codec.http.HttpVersion; -import io.netty.handler.codec.http.LastHttpContent; -import io.netty.handler.codec.http2.DefaultHttp2DataFrame; -import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame; -import io.netty.handler.codec.http2.Http2DataFrame; -import io.netty.handler.codec.http2.Http2Exception; -import io.netty.handler.codec.http2.Http2Headers; -import io.netty.handler.codec.http2.Http2HeadersFrame; -import io.netty.handler.codec.http2.Http2StreamChannel; -import io.netty.handler.codec.http2.Http2StreamFrame; -import io.netty.handler.codec.http2.HttpConversionUtil; -import io.netty.handler.ssl.SslHandler; -import io.netty.util.internal.UnstableApi; - -import java.util.List; - -/** - * This handler converts from {@link Http2StreamFrame} to {@link HttpObject}, - * and back. It can be used as an adapter to make http/2 connections backward-compatible with - * {@link ChannelHandler}s expecting {@link HttpObject}. - * - * For simplicity, it converts to chunked encoding unless the entire stream - * is a single header. - */ -@UnstableApi -@Sharable -public class Http2StreamFrameToHttpObjectCodec extends MessageToMessageCodec { - - private final boolean isServer; - - private final boolean validateHeaders; - - private HttpScheme scheme; - - public Http2StreamFrameToHttpObjectCodec(final boolean isServer, - final boolean validateHeaders) { - this.isServer = isServer; - this.validateHeaders = validateHeaders; - scheme = HttpScheme.HTTP; - } - - public Http2StreamFrameToHttpObjectCodec(final boolean isServer) { - this(isServer, true); - } - - @Override - public boolean acceptInboundMessage(Object msg) throws Exception { - return (msg instanceof Http2HeadersFrame) || (msg instanceof Http2DataFrame); - } - - @Override - protected void decode(ChannelHandlerContext ctx, Http2StreamFrame frame, List out) throws Exception { - if (frame instanceof Http2HeadersFrame) { - int id = frame.stream() != null ? frame.stream().id() : -1; - Http2HeadersFrame headersFrame = (Http2HeadersFrame) frame; - Http2Headers headers = headersFrame.headers(); - final CharSequence status = headers.status(); - // 100-continue response is a special case where Http2HeadersFrame#isEndStream=false - // but we need to decode it as a FullHttpResponse to play nice with HttpObjectAggregator. - if (null != status && HttpResponseStatus.CONTINUE.codeAsText().contentEquals(status)) { - final FullHttpMessage fullMsg = newFullMessage(id, headers, ctx.alloc()); - out.add(fullMsg); - return; - } - if (headersFrame.isEndStream()) { - if (headers.method() == null && status == null) { - LastHttpContent last = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER, validateHeaders); - HttpConversionUtil.addHttp2ToHttpHeaders(id, headers, last.trailingHeaders(), - HttpVersion.HTTP_1_1, true, true); - out.add(last); - } else { - FullHttpMessage full = newFullMessage(id, headers, ctx.alloc()); - out.add(full); - } - } else { - HttpMessage req = newMessage(id, headers); - if (!HttpUtil.isContentLengthSet(req)) { - req.headers().add(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED); - } - out.add(req); - } - } else if (frame instanceof Http2DataFrame) { - Http2DataFrame dataFrame = (Http2DataFrame) frame; - if (dataFrame.isEndStream()) { - out.add(new DefaultLastHttpContent(dataFrame.content().retain(), validateHeaders)); - } else { - out.add(new DefaultHttpContent(dataFrame.content().retain())); - } - } - } - - private void encodeLastContent(LastHttpContent last, List out) { - boolean needFiller = !(last instanceof FullHttpMessage) && last.trailingHeaders().isEmpty(); - if (last.content().isReadable() || needFiller) { - out.add(new DefaultHttp2DataFrame(last.content().retain(), last.trailingHeaders().isEmpty())); - } - if (!last.trailingHeaders().isEmpty()) { - Http2Headers headers = HttpConversionUtil.toHttp2Headers(last.trailingHeaders(), validateHeaders); - out.add(new DefaultHttp2HeadersFrame(headers, true)); - } - } - - /** - * Encode from an {@link HttpObject} to an {@link Http2StreamFrame}. This method will - * be called for each written message that can be handled by this encoder. - * - * NOTE: 100-Continue responses that are NOT {@link FullHttpResponse} will be rejected. - * - * @param ctx the {@link ChannelHandlerContext} which this handler belongs to - * @param obj the {@link HttpObject} message to encode - * @param out the {@link List} into which the encoded msg should be added - * needs to do some kind of aggregation - * @throws Exception is thrown if an error occurs - */ - @Override - protected void encode(ChannelHandlerContext ctx, HttpObject obj, List out) throws Exception { - // 100-continue is typically a FullHttpResponse, but the decoded - // Http2HeadersFrame should not be marked as endStream=true - if (obj instanceof HttpResponse) { - final HttpResponse res = (HttpResponse) obj; - if (res.status().equals(HttpResponseStatus.CONTINUE)) { - if (res instanceof FullHttpResponse) { - final Http2Headers headers = toHttp2Headers(res); - out.add(new DefaultHttp2HeadersFrame(headers, false)); - return; - } else { - throw new EncoderException( - HttpResponseStatus.CONTINUE.toString() + " must be a FullHttpResponse"); - } - } - } - if (obj instanceof HttpMessage) { - Http2Headers headers = toHttp2Headers((HttpMessage) obj); - boolean noMoreFrames = false; - if (obj instanceof FullHttpMessage) { - FullHttpMessage full = (FullHttpMessage) obj; - noMoreFrames = !full.content().isReadable() && full.trailingHeaders().isEmpty(); - } - out.add(new DefaultHttp2HeadersFrame(headers, noMoreFrames)); - } - - if (obj instanceof LastHttpContent) { - LastHttpContent last = (LastHttpContent) obj; - encodeLastContent(last, out); - } else if (obj instanceof HttpContent) { - HttpContent cont = (HttpContent) obj; - out.add(new DefaultHttp2DataFrame(cont.content().retain(), false)); - } - } - - private Http2Headers toHttp2Headers(final HttpMessage msg) { - if (msg instanceof HttpRequest) { - msg.headers().set( - HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), - scheme.name()); - } - return HttpConversionUtil.toHttp2Headers(msg, validateHeaders); - } - - private HttpMessage newMessage(final int id, - final Http2Headers headers) throws Http2Exception { - return isServer ? - HttpConversionUtil.toHttpRequest(id, headers, validateHeaders) : - HttpConversionUtil.toHttpResponse(id, headers, validateHeaders); - } - - private FullHttpMessage newFullMessage(final int id, - final Http2Headers headers, - final ByteBufAllocator alloc) throws Http2Exception { - return isServer ? - HttpConversionUtil.toFullHttpRequest(id, headers, alloc, validateHeaders) : - HttpConversionUtil.toFullHttpResponse(id, headers, alloc, validateHeaders); - } - - @Override - public void handlerAdded(final ChannelHandlerContext ctx) throws Exception { - super.handlerAdded(ctx); - // This handler is typically used on an Http2StreamChannel. At this - // stage, ssl handshake should've been established. checking for the - // presence of SslHandler in the parent's channel pipeline to - // determine the HTTP scheme should suffice, even for the case where - // SniHandler is used. - scheme = isSsl(ctx) ? HttpScheme.HTTPS : HttpScheme.HTTP; - } - - protected boolean isSsl(final ChannelHandlerContext ctx) { - final Channel ch = ctx.channel(); - final Channel connChannel = (ch instanceof Http2StreamChannel) ? ch.parent() : ch; - return null != connChannel.pipeline().get(SslHandler.class); - } -} diff --git a/netty-http-server/src/main/java/org/xbib/netty/http/server/transport/HttpServerRequest.java b/netty-http-server/src/main/java/org/xbib/netty/http/server/transport/HttpServerRequest.java index d675d4c..612fc06 100644 --- a/netty-http-server/src/main/java/org/xbib/netty/http/server/transport/HttpServerRequest.java +++ b/netty-http-server/src/main/java/org/xbib/netty/http/server/transport/HttpServerRequest.java @@ -66,8 +66,10 @@ public class HttpServerRequest implements ServerRequest { } void handleParameters() { + if (logger.isLoggable(Level.FINER)) { + logger.log(Level.FINER, () -> "request = " + httpRequest); + } Charset charset = HttpUtil.getCharset(httpRequest, StandardCharsets.UTF_8); - HttpParameters httpParameters = new HttpParameters(); this.url = URL.builder() .charset(charset, CodingErrorAction.REPLACE) .path(httpRequest.uri()) // creates path, query params, fragment @@ -76,31 +78,28 @@ public class HttpServerRequest implements ServerRequest { CharSequence mimeType = HttpUtil.getMimeType(httpRequest); ByteBuf byteBuf = httpRequest.content(); if (logger.isLoggable(Level.FINER)) { - logger.log(Level.FINER, "url = " + url + + logger.log(Level.FINER, () -> "url = " + url + + " charset = " + charset + " mime type = " + mimeType + " queryParameters = " + queryParameters + " body exists = " + (byteBuf != null)); } if (byteBuf != null) { - if (httpRequest.method().equals(HttpMethod.POST) && mimeType != null) { + if (httpRequest.method().equals(HttpMethod.POST)) { String params; // https://www.w3.org/TR/html4/interact/forms.html#h-17.13.4 - if (HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED.toString().equals(mimeType.toString())) { + if (mimeType != null && HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED.toString().equals(mimeType.toString())) { Charset htmlCharset = HttpUtil.getCharset(httpRequest, StandardCharsets.ISO_8859_1); params = byteBuf.toString(htmlCharset).replace('+', ' '); if (logger.isLoggable(Level.FINER)) { logger.log(Level.FINER, "html form, charset = " + htmlCharset + " param body = " + params); } - } else { - params = byteBuf.toString(charset); - if (logger.isLoggable(Level.FINER)) { - logger.log(Level.FINER, "not a html form, charset = " + charset + " param body = " + params); - } + queryParameters.addPercentEncodedBody(params); + queryParameters.add("_raw", params); } - queryParameters.addPercentEncodedBody(params); - queryParameters.add("_body", params); } } + HttpParameters httpParameters = new HttpParameters(); for (Pair pair : queryParameters) { httpParameters.add(pair.getFirst(), pair.getSecond()); } diff --git a/netty-http-server/src/test/java/org/xbib/netty/http/server/test/NettyHttpTestExtension.java b/netty-http-server/src/test/java/org/xbib/netty/http/server/test/NettyHttpTestExtension.java index a403d99..0bde982 100644 --- a/netty-http-server/src/test/java/org/xbib/netty/http/server/test/NettyHttpTestExtension.java +++ b/netty-http-server/src/test/java/org/xbib/netty/http/server/test/NettyHttpTestExtension.java @@ -23,6 +23,7 @@ public class NettyHttpTestExtension implements BeforeAllCallback { System.setProperty("io.netty.noUnsafe", Boolean.toString(true)); //System.setProperty("io.netty.leakDetection.level", "paranoid"); Level level = Level.INFO; + //Level level = Level.ALL; System.setProperty("java.util.logging.SimpleFormatter.format", "%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS.%1$tL %4$-7s [%3$s] %5$s %6$s%n"); LogManager.getLogManager().reset(); diff --git a/netty-http-server/src/test/java/org/xbib/netty/http/server/test/hacks/HttpPipeliningHandlerTest.java b/netty-http-server/src/test/java/org/xbib/netty/http/server/test/hacks/HttpPipeliningHandlerTest.java index f5b327c..a065904 100644 --- a/netty-http-server/src/test/java/org/xbib/netty/http/server/test/hacks/HttpPipeliningHandlerTest.java +++ b/netty-http-server/src/test/java/org/xbib/netty/http/server/test/hacks/HttpPipeliningHandlerTest.java @@ -19,6 +19,7 @@ import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.codec.http.QueryStringDecoder; import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.extension.ExtendWith; @@ -48,6 +49,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +@Disabled /* flaky */ @TestInstance(TestInstance.Lifecycle.PER_CLASS) @ExtendWith(NettyHttpTestExtension.class) class HttpPipeliningHandlerTest { @@ -166,7 +168,7 @@ class HttpPipeliningHandlerTest { } } - private class WorkEmulatorHandler extends SimpleChannelInboundHandler { + private static class WorkEmulatorHandler extends SimpleChannelInboundHandler { private final ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); diff --git a/netty-http-server/src/test/java/org/xbib/netty/http/server/test/http1/MimeUploadTest.java b/netty-http-server/src/test/java/org/xbib/netty/http/server/test/http1/MimeUploadTest.java new file mode 100644 index 0000000..780b02d --- /dev/null +++ b/netty-http-server/src/test/java/org/xbib/netty/http/server/test/http1/MimeUploadTest.java @@ -0,0 +1,83 @@ +package org.xbib.netty.http.server.test.http1; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufOutputStream; +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.multipart.MixedFileUpload; +import static org.junit.jupiter.api.Assertions.assertTrue; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.xbib.netty.http.client.Client; +import org.xbib.netty.http.client.api.Request; +import org.xbib.netty.http.client.api.ResponseListener; +import org.xbib.netty.http.common.HttpAddress; +import org.xbib.netty.http.common.HttpParameters; +import org.xbib.netty.http.common.HttpResponse; +import org.xbib.netty.http.server.Domain; +import org.xbib.netty.http.server.Server; +import org.xbib.netty.http.server.api.ServerResponse; +import org.xbib.netty.http.server.test.NettyHttpTestExtension; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; + +@ExtendWith(NettyHttpTestExtension.class) +class MimeUploadTest { + + private static final Logger logger = Logger.getLogger(MimeUploadTest.class.getName()); + + @Test + void testMimetHttp1() throws Exception { + final AtomicBoolean success1 = new AtomicBoolean(false); + HttpAddress httpAddress = HttpAddress.http1("localhost", 8008); + Domain domain = Domain.builder(httpAddress) + .singleEndpoint("/upload", "/**", (req, resp) -> { + HttpParameters parameters = req.getParameters(); + logger.log(Level.INFO, "got request, headers = " + req.getHeaders() + + " params = " + parameters.toString() + + " body = " + req.getContent().toString(StandardCharsets.UTF_8)); + ServerResponse.write(resp, HttpResponseStatus.OK); + }, "POST") + .build(); + Server server = Server.builder(domain) + .build(); + Client client = Client.builder() + .enableDebug() + .build(); + try { + server.accept(); + + ByteBuf byteBuf = Unpooled.buffer(); + ByteBufOutputStream outputStream = new ByteBufOutputStream(byteBuf); + int max = 10 * 1024; + for (int i = 0; i < max; i++) { + outputStream.writeBytes("Hi"); + } + MixedFileUpload upload = new MixedFileUpload("Test upload", + "test.txt", "text/plain", "binary", + StandardCharsets.UTF_8, byteBuf.readableBytes(), 10 * 1024); + upload.setContent(byteBuf); + ResponseListener responseListener = (resp) -> { + if (resp.getStatus().getCode() == HttpResponseStatus.OK.code()) { + success1.set(true); + } + }; + Request postRequest = Request.post() + .setVersion(HttpVersion.HTTP_1_1) + .url(server.getServerConfig().getAddress().base().resolve("/upload")) + .addBodyData(upload) + .setResponseListener(responseListener) + .build(); + client.execute(postRequest).get(); + } finally { + server.shutdownGracefully(); + client.shutdownGracefully(); + logger.log(Level.INFO, "server and client shut down"); + } + assertTrue(success1.get()); + } + +} diff --git a/netty-http-server/src/test/java/org/xbib/netty/http/server/test/http1/PostTest.java b/netty-http-server/src/test/java/org/xbib/netty/http/server/test/http1/PostTest.java index f369f39..5a57a7b 100644 --- a/netty-http-server/src/test/java/org/xbib/netty/http/server/test/http1/PostTest.java +++ b/netty-http-server/src/test/java/org/xbib/netty/http/server/test/http1/PostTest.java @@ -241,4 +241,53 @@ class PostTest { assertTrue(success3.get()); assertTrue(success4.get()); } + + @Test + void testPostInvalidPercentEncodings() throws Exception { + final AtomicBoolean success1 = new AtomicBoolean(false); + final AtomicBoolean success2 = new AtomicBoolean(false); + final AtomicBoolean success3 = new AtomicBoolean(false); + HttpAddress httpAddress = HttpAddress.http1("localhost", 8008); + Domain domain = Domain.builder(httpAddress) + .singleEndpoint("/post", "/**", (req, resp) -> { + HttpParameters parameters = req.getParameters(); + logger.log(Level.INFO, "got request " + parameters.toString() + ", sending OK"); + if ("myÿvalue".equals(parameters.getFirst("my param"))) { + success1.set(true); + } + if ("b%YYc".equals(parameters.getFirst("a"))) { + success2.set(true); + } + ServerResponse.write(resp, HttpResponseStatus.OK); + }, "POST") + .build(); + Server server = Server.builder(domain) + .build(); + Client client = Client.builder() + .build(); + try { + server.accept(); + ResponseListener responseListener = (resp) -> { + if (resp.getStatus().getCode() == HttpResponseStatus.OK.code()) { + success3.set(true); + } + }; + Request postRequest = Request.post().setVersion(HttpVersion.HTTP_1_1) + .url(server.getServerConfig().getAddress().base().resolve("/post/test.txt")) + .contentType(HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED, StandardCharsets.ISO_8859_1) + .addRawParameter("a", "b%YYc") + .addRawFormParameter("my param", "my%ZZvalue") + .setResponseListener(responseListener) + .build(); + client.execute(postRequest).get(); + } finally { + server.shutdownGracefully(); + client.shutdownGracefully(); + logger.log(Level.INFO, "server and client shut down"); + } + assertTrue(success1.get()); + assertTrue(success2.get()); + assertTrue(success3.get()); + } + }