more feature for form post parameters, chunked upload

This commit is contained in:
Jörg Prante 2019-10-01 09:46:40 +02:00
parent 59ac22d492
commit 53ab059bb3
29 changed files with 578 additions and 621 deletions

View file

@ -1,13 +1,13 @@
group = org.xbib
name = netty-http
version = 4.1.41.1
version = 4.1.41.2
# netty
netty.version = 4.1.41.Final
tcnative.version = 2.0.25.Final
# for netty-http-common
xbib-net-url.version = 2.0.2
xbib-net-url.version = 2.0.3
# for netty-http-server
bouncycastle.version = 1.62

View file

@ -13,6 +13,7 @@ import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.codec.http.QueryStringEncoder;
import io.netty.handler.codec.http.multipart.InterfaceHttpData;
import io.netty.handler.codec.http2.HttpConversionUtil;
import io.netty.util.AsciiString;
import org.xbib.net.PercentEncoder;
@ -57,6 +58,8 @@ public final class Request {
private final ByteBuf content;
private final List<InterfaceHttpData> bodyData;
private final long timeoutInMillis;
private final boolean followRedirect;
@ -74,7 +77,7 @@ public final class Request {
private ResponseListener<HttpResponse> responseListener;
private Request(URL url, String uri, HttpVersion httpVersion, HttpMethod httpMethod,
HttpHeaders headers, Collection<Cookie> cookies, ByteBuf content,
HttpHeaders headers, Collection<Cookie> cookies, ByteBuf content, List<InterfaceHttpData> bodyData,
long timeoutInMillis, boolean followRedirect, int maxRedirect, int redirectCount,
boolean isBackOff, BackOff backOff, ResponseListener<HttpResponse> responseListener) {
this.url = url;
@ -84,6 +87,7 @@ public final class Request {
this.headers = headers;
this.cookies = cookies;
this.content = content;
this.bodyData = bodyData;
this.timeoutInMillis = timeoutInMillis;
this.followRedirect = followRedirect;
this.maxRedirects = maxRedirect;
@ -126,6 +130,10 @@ public final class Request {
return content;
}
public List<InterfaceHttpData> getBodyData() {
return bodyData;
}
/**
* Return the timeout in milliseconds per request. This overrides the read timeout of the client.
* @return timeout timeout in milliseconds
@ -306,6 +314,8 @@ public final class Request {
private ByteBuf content;
private List<InterfaceHttpData> bodyData;
private long timeoutInMillis;
private boolean followRedirect;
@ -333,6 +343,7 @@ public final class Request {
this.removeHeaders = new ArrayList<>();
this.cookies = new HashSet<>();
this.uriParameters = new HttpParameters();
this.bodyData = new ArrayList<>();
charset(StandardCharsets.UTF_8);
}
@ -439,6 +450,13 @@ public final class Request {
return this;
}
public Builder addRawParameter(String name, String value) {
Objects.requireNonNull(name);
Objects.requireNonNull(value);
uriParameters.add(name, value);
return this;
}
public Builder addFormParameter(String name, String value) {
Objects.requireNonNull(name);
Objects.requireNonNull(value);
@ -446,6 +464,18 @@ public final class Request {
return this;
}
public Builder addRawFormParameter(String name, String value) {
Objects.requireNonNull(name);
Objects.requireNonNull(value);
formParameters.add(name, value);
return this;
}
public Builder addBodyData(InterfaceHttpData data) {
bodyData.add(data);
return this;
}
private String encode(CharSequence contentType, String value) {
if (value == null) {
return null;
@ -631,7 +661,7 @@ public final class Request {
for (String headerName : removeHeaders) {
validatedHeaders.remove(headerName);
}
return new Request(url, uri, httpVersion, httpMethod, validatedHeaders, cookies, content,
return new Request(url, uri, httpVersion, httpMethod, validatedHeaders, cookies, content, bodyData,
timeoutInMillis, followRedirect, maxRedirects, 0, enableBackOff, backOff,
responseListener);
}

View file

@ -11,6 +11,7 @@ import io.netty.handler.logging.LogLevel;
import io.netty.handler.ssl.ApplicationProtocolNames;
import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.xbib.netty.http.client.Client;
import org.xbib.netty.http.client.ClientConfig;
import org.xbib.netty.http.client.api.HttpChannelInitializer;
@ -63,7 +64,7 @@ public class Http1ChannelInitializer extends ChannelInitializer<Channel> impleme
private void configureEncrypted(Channel channel) {
ChannelPipeline pipeline = channel.pipeline();
SslHandler sslHandler = sslHandlerFactory.create();
pipeline.addLast("ssl-handler", sslHandler);
pipeline.addLast("client-ssl-handler", sslHandler);
if (clientConfig.isEnableNegotiation()) {
ApplicationProtocolNegotiationHandler negotiationHandler =
new ApplicationProtocolNegotiationHandler(ApplicationProtocolNames.HTTP_1_1) {
@ -95,15 +96,17 @@ public class Http1ChannelInitializer extends ChannelInitializer<Channel> impleme
private void configureCleartext(Channel channel) {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new HttpClientCodec(clientConfig.getMaxInitialLineLength(),
//pipeline.addLast("client-chunk-compressor", new HttpChunkContentCompressor(6));
pipeline.addLast("http-client-chunk-writer", new ChunkedWriteHandler());
pipeline.addLast("http-client-codec", new HttpClientCodec(clientConfig.getMaxInitialLineLength(),
clientConfig.getMaxHeadersSize(), clientConfig.getMaxChunkSize()));
if (clientConfig.isEnableGzip()) {
pipeline.addLast(new HttpContentDecompressor());
pipeline.addLast("http-client-decompressor", new HttpContentDecompressor());
}
HttpObjectAggregator httpObjectAggregator = new HttpObjectAggregator(clientConfig.getMaxContentLength(),
false);
httpObjectAggregator.setMaxCumulationBufferComponents(clientConfig.getMaxCompositeBufferComponents());
pipeline.addLast(httpObjectAggregator);
pipeline.addLast(httpResponseHandler);
pipeline.addLast("http-client-aggregator", httpObjectAggregator);
pipeline.addLast("http-client-handler", httpResponseHandler);
}
}

View file

@ -76,12 +76,10 @@ public class Http2ChannelInitializer extends ChannelInitializer<Channel> impleme
Http2MultiplexCodec multiplexCodec = multiplexCodecBuilder.autoAckSettingsFrame(true) .build();
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("client-multiplex", multiplexCodec);
// does not work
//pipeline.addLast("client-decompressor", new HttpContentDecompressor());
pipeline.addLast("client-messages", new ClientMessages());
}
class ClientMessages extends ChannelInboundHandlerAdapter {
static class ClientMessages extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
@ -118,9 +116,9 @@ public class Http2ChannelInitializer extends ChannelInitializer<Channel> impleme
}
}
class PushPromiseHandler extends Http2FrameLogger {
static class PushPromiseHandler extends Http2FrameLogger {
public PushPromiseHandler(LogLevel level, String name) {
PushPromiseHandler(LogLevel level, String name) {
super(level, name);
}

View file

@ -1,225 +0,0 @@
package org.xbib.netty.http.client.handler.http2;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.EncoderException;
import io.netty.handler.codec.MessageToMessageCodec;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.FullHttpMessage;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpScheme;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
import io.netty.handler.codec.http2.Http2DataFrame;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2HeadersFrame;
import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.handler.codec.http2.Http2StreamFrame;
import io.netty.handler.codec.http2.HttpConversionUtil;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.internal.UnstableApi;
import java.util.List;
/**
* This handler converts from {@link Http2StreamFrame} to {@link HttpObject},
* and back. It can be used as an adapter to make http/2 connections backward-compatible with
* {@link ChannelHandler}s expecting {@link HttpObject}.
*
* For simplicity, it converts to chunked encoding unless the entire stream
* is a single header.
*
* Patched version of original Netty's Http2StreamFrameToHttpObjectCodec.
* This one is using the streamId from {@code frame.stream().id()}.
*/
@UnstableApi
@Sharable
public class Http2StreamFrameToHttpObjectCodec extends MessageToMessageCodec<Http2StreamFrame, HttpObject> {
private final boolean isServer;
private final boolean validateHeaders;
private HttpScheme scheme;
public Http2StreamFrameToHttpObjectCodec(final boolean isServer,
final boolean validateHeaders) {
this.isServer = isServer;
this.validateHeaders = validateHeaders;
scheme = HttpScheme.HTTP;
}
public Http2StreamFrameToHttpObjectCodec(final boolean isServer) {
this(isServer, true);
}
@Override
public boolean acceptInboundMessage(Object msg) throws Exception {
return (msg instanceof Http2HeadersFrame) || (msg instanceof Http2DataFrame);
}
@Override
protected void decode(ChannelHandlerContext ctx, Http2StreamFrame frame, List<Object> out) throws Exception {
if (frame instanceof Http2HeadersFrame) {
int id = frame.stream() != null ? frame.stream().id() : -1;
Http2HeadersFrame headersFrame = (Http2HeadersFrame) frame;
Http2Headers headers = headersFrame.headers();
final CharSequence status = headers.status();
// 100-continue response is a special case where Http2HeadersFrame#isEndStream=false
// but we need to decode it as a FullHttpResponse to play nice with HttpObjectAggregator.
if (null != status && HttpResponseStatus.CONTINUE.codeAsText().contentEquals(status)) {
final FullHttpMessage fullMsg = newFullMessage(id, headers, ctx.alloc());
out.add(fullMsg);
return;
}
if (headersFrame.isEndStream()) {
if (headers.method() == null && status == null) {
LastHttpContent last = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER, validateHeaders);
HttpConversionUtil.addHttp2ToHttpHeaders(id, headers, last.trailingHeaders(),
HttpVersion.HTTP_1_1, true, true);
out.add(last);
} else {
FullHttpMessage full = newFullMessage(id, headers, ctx.alloc());
out.add(full);
}
} else {
HttpMessage req = newMessage(id, headers);
if (!HttpUtil.isContentLengthSet(req)) {
req.headers().add(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
}
out.add(req);
}
} else if (frame instanceof Http2DataFrame) {
Http2DataFrame dataFrame = (Http2DataFrame) frame;
if (dataFrame.isEndStream()) {
out.add(new DefaultLastHttpContent(dataFrame.content().retain(), validateHeaders));
} else {
out.add(new DefaultHttpContent(dataFrame.content().retain()));
}
}
}
private void encodeLastContent(LastHttpContent last, List<Object> out) {
boolean needFiller = !(last instanceof FullHttpMessage) && last.trailingHeaders().isEmpty();
if (last.content().isReadable() || needFiller) {
out.add(new DefaultHttp2DataFrame(last.content().retain(), last.trailingHeaders().isEmpty()));
}
if (!last.trailingHeaders().isEmpty()) {
Http2Headers headers = HttpConversionUtil.toHttp2Headers(last.trailingHeaders(), validateHeaders);
out.add(new DefaultHttp2HeadersFrame(headers, true));
}
}
/**
* Encode from an {@link HttpObject} to an {@link Http2StreamFrame}. This method will
* be called for each written message that can be handled by this encoder.
*
* NOTE: 100-Continue responses that are NOT {@link FullHttpResponse} will be rejected.
*
* @param ctx the {@link ChannelHandlerContext} which this handler belongs to
* @param obj the {@link HttpObject} message to encode
* @param out the {@link List} into which the encoded msg should be added
* needs to do some kind of aggregation
* @throws Exception is thrown if an error occurs
*/
@Override
protected void encode(ChannelHandlerContext ctx, HttpObject obj, List<Object> out) throws Exception {
// 100-continue is typically a FullHttpResponse, but the decoded
// Http2HeadersFrame should not be marked as endStream=true
if (obj instanceof HttpResponse) {
final HttpResponse res = (HttpResponse) obj;
if (res.status().equals(HttpResponseStatus.CONTINUE)) {
if (res instanceof FullHttpResponse) {
final Http2Headers headers = toHttp2Headers(res);
out.add(new DefaultHttp2HeadersFrame(headers, false));
return;
} else {
throw new EncoderException(
HttpResponseStatus.CONTINUE.toString() + " must be a FullHttpResponse");
}
}
}
if (obj instanceof HttpMessage) {
Http2Headers headers = toHttp2Headers((HttpMessage) obj);
boolean noMoreFrames = false;
if (obj instanceof FullHttpMessage) {
FullHttpMessage full = (FullHttpMessage) obj;
noMoreFrames = !full.content().isReadable() && full.trailingHeaders().isEmpty();
}
out.add(new DefaultHttp2HeadersFrame(headers, noMoreFrames));
}
if (obj instanceof LastHttpContent) {
LastHttpContent last = (LastHttpContent) obj;
encodeLastContent(last, out);
} else if (obj instanceof HttpContent) {
HttpContent cont = (HttpContent) obj;
out.add(new DefaultHttp2DataFrame(cont.content().retain(), false));
}
}
private Http2Headers toHttp2Headers(final HttpMessage msg) {
if (msg instanceof HttpRequest) {
msg.headers().set(
HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(),
scheme.name());
}
return HttpConversionUtil.toHttp2Headers(msg, validateHeaders);
}
private HttpMessage newMessage(final int id,
final Http2Headers headers) throws Http2Exception {
return isServer ?
HttpConversionUtil.toHttpRequest(id, headers, validateHeaders) :
HttpConversionUtil.toHttpResponse(id, headers, validateHeaders);
}
private FullHttpMessage newFullMessage(final int id,
final Http2Headers headers,
final ByteBufAllocator alloc) throws Http2Exception {
return isServer ?
HttpConversionUtil.toFullHttpRequest(id, headers, alloc, validateHeaders) :
HttpConversionUtil.toFullHttpResponse(id, headers, alloc, validateHeaders);
}
@Override
public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
super.handlerAdded(ctx);
// this handler is typically used on an Http2StreamChannel. at this
// stage, ssl handshake should've been established. checking for the
// presence of SslHandler in the parent's channel pipeline to
// determine the HTTP scheme should suffice, even for the case where
// SniHandler is used.
scheme = isSsl(ctx) ? HttpScheme.HTTPS : HttpScheme.HTTP;
}
protected boolean isSsl(final ChannelHandlerContext ctx) {
final Channel ch = ctx.channel();
final Channel connChannel = (ch instanceof Http2StreamChannel) ? ch.parent() : ch;
return null != connChannel.pipeline().get(SslHandler.class);
}
}

View file

@ -3,6 +3,8 @@ package org.xbib.netty.http.client.transport;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.multipart.DefaultHttpDataFactory;
import io.netty.handler.codec.http.multipart.HttpDataFactory;
import io.netty.handler.ssl.SslHandler;
import org.xbib.net.PercentDecoder;
import org.xbib.net.URL;
@ -58,12 +60,15 @@ public abstract class BaseTransport implements Transport {
private CookieBox cookieBox;
protected HttpDataFactory httpDataFactory;
BaseTransport(Client client, HttpAddress httpAddress) {
this.client = client;
this.httpAddress = httpAddress;
this.channels = new ConcurrentHashMap<>();
this.flowMap = new ConcurrentHashMap<>();
this.requests = new ConcurrentSkipListMap<>();
this.httpDataFactory = new DefaultHttpDataFactory();
}
@Override
@ -104,6 +109,7 @@ public abstract class BaseTransport implements Transport {
flow.close();
}
channels.clear();
httpDataFactory.cleanAllHttpData();
// do not clear requests
}
@ -296,10 +302,7 @@ public abstract class BaseTransport implements Transport {
hostAndPort.append(':').append(redirUrl.getPort());
}
newHttpRequest.headers().set(HttpHeaderNames.HOST, hostAndPort.toString());
logger.log(Level.FINE, "redirect url: " + redirUrl +
" old request: " + request.toString() +
" new request: " + newHttpRequest.toString());
request.release();
logger.log(Level.FINE, "redirect url: " + redirUrl);
return newHttpRequest;
}
break;
@ -338,7 +341,7 @@ public abstract class BaseTransport implements Transport {
if (backOff != null) {
long millis = backOff.nextBackOffMillis();
if (millis != BackOff.STOP) {
logger.log(Level.FINE, "status = " + status + " backing off request by " + millis + " milliseconds");
logger.log(Level.FINE, () -> "status = " + status + " backing off request by " + millis + " milliseconds");
try {
Thread.sleep(millis);
} catch (InterruptedException e) {

View file

@ -26,11 +26,11 @@ class Flow {
}
Integer firstKey() {
return map.firstKey();
return map.isEmpty() ? null : map.firstKey();
}
Integer lastKey() {
return map.lastKey();
return map.isEmpty() ? null : map.lastKey();
}
void put(Integer key, CompletableFuture<Boolean> promise) {

View file

@ -5,6 +5,7 @@ import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.multipart.HttpPostRequestEncoder;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.codec.http2.HttpConversionUtil;
@ -16,8 +17,6 @@ import org.xbib.netty.http.client.cookie.ClientCookieEncoder;
import org.xbib.netty.http.common.DefaultHttpResponse;
import org.xbib.netty.http.common.HttpAddress;
import org.xbib.netty.http.client.api.Request;
import org.xbib.netty.http.client.api.ResponseListener;
import org.xbib.netty.http.common.HttpResponse;
import org.xbib.netty.http.common.cookie.Cookie;
import java.io.IOException;
@ -51,6 +50,7 @@ public class Http1Transport extends BaseTransport {
FullHttpRequest fullHttpRequest = request.content() == null ?
new DefaultFullHttpRequest(request.httpVersion(), request.httpMethod(), uri) :
new DefaultFullHttpRequest(request.httpVersion(), request.httpMethod(), uri, request.content());
HttpPostRequestEncoder httpPostRequestEncoder = null;
final Integer streamId = flowMap.get(channelId).nextStreamId();
if (streamId == null) {
throw new IllegalStateException();
@ -68,9 +68,25 @@ public class Http1Transport extends BaseTransport {
}
// add stream-id and cookie headers
fullHttpRequest.headers().set(request.headers());
// flush after putting request into requests map
if (request.content() == null && !request.getBodyData().isEmpty()) {
try {
httpPostRequestEncoder =
new HttpPostRequestEncoder(httpDataFactory, fullHttpRequest, true);
httpPostRequestEncoder.setBodyHttpDatas(request.getBodyData());
httpPostRequestEncoder.finalizeRequest();
} catch (HttpPostRequestEncoder.ErrorDataEncoderException e) {
throw new IOException(e);
}
}
if (channel.isWritable()) {
channel.writeAndFlush(fullHttpRequest);
channel.write(fullHttpRequest);
if (httpPostRequestEncoder != null && httpPostRequestEncoder.isChunked()) {
channel.write(httpPostRequestEncoder);
}
channel.flush();
if (httpPostRequestEncoder != null) {
httpPostRequestEncoder.cleanFiles();
}
client.getRequestCounter().incrementAndGet();
}
return this;
@ -119,15 +135,17 @@ public class Http1Transport extends BaseTransport {
} catch (URLSyntaxException | IOException e) {
logger.log(Level.WARNING, e.getMessage(), e);
}
// acknowledge success
// acknowledge success, if possible
String channelId = channel.id().toString();
Flow flow = flowMap.get(channelId);
if (flow == null) {
return;
}
CompletableFuture<Boolean> promise = flow.get(flow.lastKey());
if (promise != null) {
promise.complete(true);
if (flow != null) {
Integer lastKey = flow.lastKey();
if (lastKey != null) {
CompletableFuture<Boolean> promise = flow.get(lastKey);
if (promise != null) {
promise.complete(true);
}
}
}
} finally {
if (requestKey != null) {

View file

@ -4,6 +4,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
@ -13,6 +14,7 @@ import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.handler.codec.http2.Http2StreamChannelBootstrap;
import io.netty.handler.codec.http2.Http2StreamFrameToHttpObjectCodec;
import io.netty.handler.codec.http2.HttpConversionUtil;
import io.netty.util.AsciiString;
import org.xbib.net.URLSyntaxException;
@ -21,7 +23,6 @@ import org.xbib.netty.http.client.api.Transport;
import org.xbib.netty.http.client.cookie.ClientCookieDecoder;
import org.xbib.netty.http.client.cookie.ClientCookieEncoder;
import org.xbib.netty.http.client.handler.http2.Http2ResponseHandler;
import org.xbib.netty.http.client.handler.http2.Http2StreamFrameToHttpObjectCodec;
import org.xbib.netty.http.common.DefaultHttpResponse;
import org.xbib.netty.http.common.HttpAddress;
import org.xbib.netty.http.client.api.Request;
@ -56,6 +57,8 @@ public class Http2Transport extends BaseTransport {
ChannelPipeline p = ch.pipeline();
p.addLast("child-client-frame-converter",
new Http2StreamFrameToHttpObjectCodec(false));
p.addLast("child-client-decompressor",
new HttpContentDecompressor());
p.addLast("child-client-chunk-aggregator",
new HttpObjectAggregator(client.getClientConfig().getMaxContentLength()));
p.addLast("child-client-response-handler",

View file

@ -17,7 +17,8 @@ public class GoogleTest {
.build();
try {
// TODO decompression of frames
Request request2 = Request.get().url("https://google.com").setVersion("HTTP/2.0")
Request request2 = Request.get().url("https://google.com")
.setVersion("HTTP/2.0")
.setResponseListener(resp -> logger.log(Level.INFO, "got HTTP/2 response: " +
resp.getHeaders() + resp.getBodyAsString(StandardCharsets.UTF_8)))
.build();

View file

@ -0,0 +1,229 @@
package org.xbib.netty.http.common.mime;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.LinkedHashMap;
import java.util.Locale;
import java.util.Map;
/**
* A MIME multi part message parser (RFC 2046).
*/
public class MalvaMimeMultipartParser implements MimeMultipartParser {
private String contentType;
private byte[] boundary;
private ByteBuf payload;
private String type;
private String subType;
public MalvaMimeMultipartParser(String contentType, ByteBuf payload) {
this.contentType = contentType;
this.payload = payload;
if (contentType != null) {
int pos = contentType.indexOf(';');
this.type = pos >= 0 ? contentType.substring(0, pos) : contentType;
this.type = type.trim().toLowerCase();
this.subType = type.startsWith("multipart") ? type.substring(10).trim() : null;
Map m = parseHeaderLine(contentType);
this.boundary = m.containsKey("boundary") ? m.get("boundary").toString().getBytes(StandardCharsets.US_ASCII) : null;
}
}
@Override
public String type() {
return type;
}
@Override
public String subType() {
return subType;
}
@Override
public void parse(MimeMultipartListener listener) throws IOException {
if (boundary == null) {
return;
}
// Assumption: header is in 8 bytes (ISO-8859-1). Convert to Unicode.
StringBuilder sb = new StringBuilder();
boolean inHeader = true;
boolean inBody = false;
Integer start = null;
Map<String, String> headers = new LinkedHashMap<>();
int eol = 0;
byte[] payloadBytes = payload.array();
for (int i = 0; i < payloadBytes.length; i++) {
byte b = payloadBytes[i];
if (inHeader) {
switch (b) {
case '\r':
break;
case '\n':
if (sb.length() > 0) {
String[] s = sb.toString().split(":");
String k = s[0];
String v = s[1];
if (!k.startsWith("--")) {
headers.put(k.toLowerCase(Locale.ROOT), v.trim());
}
eol = 0;
sb.setLength(0);
} else {
eol++;
if (eol >= 1) {
eol = 0;
sb.setLength(0);
inHeader = false;
inBody = true;
}
}
break;
default:
eol = 0;
sb.append(b);
break;
}
}
if (inBody) {
int len = headers.containsKey("content-length") ?
Integer.parseInt(headers.get("content-length")) : -1;
if (len > 0) {
inBody = false;
inHeader = true;
} else {
if (start == null) {
if (b != '\r' && b != '\n') {
start = i;
}
}
if (start != null) {
i = indexOf(payloadBytes, boundary, start, payloadBytes.length);
if (i == -1) {
throw new IOException("boundary not found");
}
int l = i - start;
if (l > 4) {
l = l - 4;
}
//BytesReference body = new BytesArray(payloadBytes, start, l)
ByteBuf body = payload.retainedSlice(start, l);
Map<String, String> m = new LinkedHashMap<>();
for (Map.Entry<String, String> entry : headers.entrySet()) {
m.putAll(parseHeaderLine(entry.getValue()));
}
headers.putAll(m);
if (listener != null) {
listener.handle(type, subType, new MimePart(headers, body));
}
inBody = false;
inHeader = true;
headers = new LinkedHashMap<>();
start = null;
eol = -1;
}
}
}
}
}
private Map<String, String> parseHeaderLine(String line) {
Map<String, String> params = new LinkedHashMap<>();
int pos = line.indexOf(";");
String spec = line.substring(pos + 1);
if (pos < 0) {
return params;
}
String key = "";
String value;
boolean inKey = true;
boolean inString = false;
int start = 0;
int i;
for (i = 0; i < spec.length(); i++) {
switch (spec.charAt(i)) {
case '=':
if (inKey) {
key = spec.substring(start, i).trim().toLowerCase();
start = i + 1;
inKey = false;
} else if (!inString) {
throw new IllegalArgumentException(contentType + " value has illegal character '=' at " + i + ": " + spec);
}
break;
case ';':
if (inKey) {
if (spec.substring(start, i).trim().length() > 0) {
throw new IllegalArgumentException(contentType + " parameter missing value at " + i + ": " + spec);
} else {
throw new IllegalArgumentException(contentType + " parameter key has illegal character ';' at " + i + ": " + spec);
}
} else if (!inString) {
value = spec.substring(start, i).trim();
params.put(key, value);
key = null;
start = i + 1;
inKey = true;
}
break;
case '"':
if (inKey) {
throw new IllegalArgumentException(contentType + " key has illegal character '\"' at " + i + ": " + spec);
} else if (inString) {
value = spec.substring(start, i).trim();
params.put(key, value);
key = null;
for (i++; i < spec.length() && spec.charAt(i) != ';'; i++) {
if (!Character.isWhitespace(spec.charAt(i))) {
throw new IllegalArgumentException(contentType + " value has garbage after quoted string at " + i + ": " + spec);
}
}
start = i + 1;
inString = false;
inKey = true;
} else {
if (spec.substring(start, i).trim().length() > 0) {
throw new IllegalArgumentException(contentType + " value has garbage before quoted string at " + i + ": " + spec);
}
start = i + 1;
inString = true;
}
break;
}
}
if (inKey) {
if (pos > start && spec.substring(start, i).trim().length() > 0) {
throw new IllegalArgumentException(contentType + " missing value at " + i + ": " + spec);
}
} else if (!inString) {
value = spec.substring(start, i).trim();
params.put(key, value);
} else {
throw new IllegalArgumentException(contentType + " has an unterminated quoted string: " + spec);
}
return params;
}
private static int indexOf(byte[] array, byte[] target, int start, int end) {
if (target.length == 0) {
return 0;
}
outer:
for (int i = start; i < end - target.length + 1; i++) {
for (int j = 0; j < target.length; j++) {
if (array[i + j] != target[j]) {
continue outer;
}
}
return i;
}
return -1;
}
}

View file

@ -0,0 +1,13 @@
package org.xbib.netty.http.common.mime;
import io.netty.buffer.ByteBuf;
import java.util.Map;
public interface MimeMultipart {
Map headers();
ByteBuf body();
int length();
}

View file

@ -0,0 +1,6 @@
package org.xbib.netty.http.common.mime;
public interface MimeMultipartListener {
void handle(String type, String subtype, MimeMultipart part);
}

View file

@ -0,0 +1,12 @@
package org.xbib.netty.http.common.mime;
import java.io.IOException;
public interface MimeMultipartParser {
String type();
String subType();
void parse(MimeMultipartListener listener) throws IOException;
}

View file

@ -0,0 +1,41 @@
package org.xbib.netty.http.common.mime;
import io.netty.buffer.ByteBuf;
import java.nio.charset.StandardCharsets;
import java.util.Map;
public class MimePart implements MimeMultipart {
Map<String, String> headers;
ByteBuf body;
int length;
MimePart(Map<String, String> headers, ByteBuf body) {
this.headers = headers;
this.body = body;
this.length = body.readableBytes();
}
@Override
public Map<String, String> headers() {
return headers;
}
@Override
public ByteBuf body() {
return body;
}
@Override
public int length() {
return length;
}
@Override
public String toString() {
String b = body != null ? body.toString(StandardCharsets.UTF_8) : "";
return "headers=" + headers + " length=" + length + " body=" + b;
}
}

View file

@ -0,0 +1,3 @@
This work is based on
https://github.com/playframework/netty-reactive-streams/

View file

@ -166,9 +166,7 @@ public class HandlerPublisher<T> extends ChannelDuplexHandler implements Publish
if (subscriber == null) {
throw new NullPointerException("Null subscriber");
}
if (!hasSubscriber.compareAndSet(false, true)) {
// Must call onSubscribe first.
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
@ -179,12 +177,7 @@ public class HandlerPublisher<T> extends ChannelDuplexHandler implements Publish
});
subscriber.onError(new IllegalStateException("This publisher only supports one subscriber"));
} else {
executor.execute(new Runnable() {
@Override
public void run() {
provideSubscriber(subscriber);
}
});
executor.execute(() -> provideSubscriber(subscriber));
}
}
@ -216,8 +209,6 @@ public class HandlerPublisher<T> extends ChannelDuplexHandler implements Publish
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
// If the channel is not yet registered, then it's not safe to invoke any methods on it, eg read() or close()
// So don't provide the context until it is registered.
if (ctx.channel().isRegistered()) {
provideChannelContext(ctx);
}
@ -234,7 +225,6 @@ public class HandlerPublisher<T> extends ChannelDuplexHandler implements Publish
case NO_SUBSCRIBER_OR_CONTEXT:
verifyRegisteredWithRightExecutor(ctx);
this.ctx = ctx;
// It's set, we don't have a subscriber
state = NO_SUBSCRIBER;
break;
case NO_CONTEXT:
@ -244,7 +234,7 @@ public class HandlerPublisher<T> extends ChannelDuplexHandler implements Publish
subscriber.onSubscribe(new ChannelSubscription());
break;
default:
// Ignore, this could be invoked twice by both handlerAdded and channelRegistered.
break;
}
}
@ -256,7 +246,6 @@ public class HandlerPublisher<T> extends ChannelDuplexHandler implements Publish
@Override
public void channelActive(ChannelHandlerContext ctx) {
// If we subscribed before the channel was active, then our read would have been ignored.
if (state == DEMANDING) {
requestDemand();
}
@ -278,19 +267,16 @@ public class HandlerPublisher<T> extends ChannelDuplexHandler implements Publish
case IDLE:
if (addDemand(demand)) {
// Important to change state to demanding before doing a read, in case we get a synchronous
// read back.
state = DEMANDING;
requestDemand();
}
break;
default:
break;
}
}
private boolean addDemand(long demand) {
if (demand <= 0) {
illegalDemand();
return false;
@ -320,7 +306,7 @@ public class HandlerPublisher<T> extends ChannelDuplexHandler implements Publish
if (outstandingDemand > 0) {
if (state == BUFFERING) {
state = DEMANDING;
} // otherwise we're draining
}
requestDemand();
} else if (state == BUFFERING) {
state = IDLE;
@ -409,7 +395,6 @@ public class HandlerPublisher<T> extends ChannelDuplexHandler implements Publish
}
private void complete() {
switch (state) {
case NO_SUBSCRIBER:
case BUFFERING:
@ -422,8 +407,8 @@ public class HandlerPublisher<T> extends ChannelDuplexHandler implements Publish
state = DONE;
break;
case NO_SUBSCRIBER_ERROR:
// Ignore, we're already going to complete the stream with an error
// when the subscriber subscribes.
break;
default:
break;
}
}
@ -444,6 +429,8 @@ public class HandlerPublisher<T> extends ChannelDuplexHandler implements Publish
cleanup();
subscriber.onError(cause);
break;
default:
break;
}
}
@ -464,7 +451,7 @@ public class HandlerPublisher<T> extends ChannelDuplexHandler implements Publish
@Override
public void cancel() {
executor.execute(() -> receivedCancel());
executor.execute(HandlerPublisher.this::receivedCancel);
}
}

View file

@ -103,16 +103,13 @@ public class HandlerSubscriber<T> extends ChannelDuplexHandler implements Subscr
switch (state) {
case NO_SUBSCRIPTION_OR_CONTEXT:
this.ctx = ctx;
// We were in no subscription or context, now we just don't have a subscription.
state = NO_SUBSCRIPTION;
break;
case NO_CONTEXT:
this.ctx = ctx;
// We were in no context, we're now fully initialised
maybeStart();
break;
case COMPLETE:
// We are complete, close
state = COMPLETE;
ctx.close();
break;
@ -175,6 +172,8 @@ public class HandlerSubscriber<T> extends ChannelDuplexHandler implements Subscr
subscription.cancel();
state = CANCELLED;
break;
default:
break;
}
}
@ -201,6 +200,8 @@ public class HandlerSubscriber<T> extends ChannelDuplexHandler implements Subscr
case CANCELLED:
subscription.cancel();
break;
default:
break;
}
}
@ -248,6 +249,9 @@ public class HandlerSubscriber<T> extends ChannelDuplexHandler implements Subscr
ctx.close();
state = COMPLETE;
break;
default:
break;
}
});
}
@ -255,7 +259,6 @@ public class HandlerSubscriber<T> extends ChannelDuplexHandler implements Subscr
private void maybeRequestMore() {
if (outstandingDemand <= demandLowWatermark && ctx.channel().isWritable()) {
long toRequest = demandHighWatermark - outstandingDemand;
outstandingDemand = demandHighWatermark;
subscription.request(toRequest);
}

View file

@ -51,21 +51,16 @@ public class HttpStreamsClientHandler extends HttpStreamsHandler<HttpResponse, H
if (response.status().code() >= 100 && response.status().code() < 200) {
return false;
}
if (response.status().equals(HttpResponseStatus.NO_CONTENT) ||
response.status().equals(HttpResponseStatus.NOT_MODIFIED)) {
return false;
}
if (HttpUtil.isTransferEncodingChunked(response)) {
return true;
}
if (HttpUtil.isContentLengthSet(response)) {
return HttpUtil.getContentLength(response) > 0;
}
return true;
}
@ -132,7 +127,7 @@ public class HttpStreamsClientHandler extends HttpStreamsHandler<HttpResponse, H
ignoreResponseBody = true;
}
} else {
awaiting100ContinueMessage.subscribe(new CancelledSubscriber<HttpContent>());
awaiting100ContinueMessage.subscribe(new CancelledSubscriber<>());
awaiting100ContinueMessage = null;
awaiting100Continue.onSubscribe(new Subscription() {
public void request(long n) {

View file

@ -1,7 +1,6 @@
package org.xbib.netty.http.server.reactive;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
@ -136,43 +135,25 @@ abstract class HttpStreamsHandler<In extends HttpMessage, Out extends HttpMessag
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
if (inClass.isInstance(msg)) {
receivedInMessage(ctx);
final In inMsg = inClass.cast(msg);
if (inMsg instanceof FullHttpMessage) {
// Forward as is
ctx.fireChannelRead(inMsg);
consumedInMessage(ctx);
} else if (!hasBody(inMsg)) {
// Wrap in empty message
ctx.fireChannelRead(createEmptyMessage(inMsg));
consumedInMessage(ctx);
// There will be a LastHttpContent message coming after this, ignore it
ignoreBodyRead = true;
} else {
currentlyStreamedMessage = inMsg;
// It has a body, stream it
HandlerPublisher<HttpContent> publisher = new HandlerPublisher<HttpContent>(ctx.executor(), HttpContent.class) {
HandlerPublisher<HttpContent> publisher = new HandlerPublisher<>(ctx.executor(), HttpContent.class) {
@Override
protected void cancelled() {
if (ctx.executor().inEventLoop()) {
handleCancelled(ctx, inMsg);
} else {
ctx.executor().execute(new Runnable() {
@Override
public void run() {
handleCancelled(ctx, inMsg);
}
});
ctx.executor().execute(() -> handleCancelled(ctx, inMsg));
}
}
@ -182,7 +163,6 @@ abstract class HttpStreamsHandler<In extends HttpMessage, Out extends HttpMessag
super.requestDemand();
}
};
ctx.channel().pipeline().addAfter(ctx.name(), ctx.name() + "-body-publisher", publisher);
ctx.fireChannelRead(createStreamedMessage(inMsg, publisher));
}
@ -194,7 +174,6 @@ abstract class HttpStreamsHandler<In extends HttpMessage, Out extends HttpMessag
private void handleCancelled(ChannelHandlerContext ctx, In msg) {
if (currentlyStreamedMessage == msg) {
ignoreBodyRead = true;
// Need to do a read in case the subscriber ignored a read completed.
ctx.read();
}
}
@ -202,23 +181,18 @@ abstract class HttpStreamsHandler<In extends HttpMessage, Out extends HttpMessag
private void handleReadHttpContent(ChannelHandlerContext ctx, HttpContent content) {
if (!ignoreBodyRead) {
if (content instanceof LastHttpContent) {
if (content.content().readableBytes() > 0 ||
!((LastHttpContent) content).trailingHeaders().isEmpty()) {
// It has data or trailing headers, send them
ctx.fireChannelRead(content);
} else {
ReferenceCountUtil.release(content);
}
removeHandlerIfActive(ctx, ctx.name() + "-body-publisher");
currentlyStreamedMessage = null;
consumedInMessage(ctx);
} else {
ctx.fireChannelRead(content);
}
} else {
ReferenceCountUtil.release(content);
if (content instanceof LastHttpContent) {
@ -232,7 +206,7 @@ abstract class HttpStreamsHandler<In extends HttpMessage, Out extends HttpMessag
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
public void channelReadComplete(ChannelHandlerContext ctx) {
if (ignoreBodyRead) {
ctx.read();
} else {
@ -241,52 +215,36 @@ abstract class HttpStreamsHandler<In extends HttpMessage, Out extends HttpMessag
}
@Override
public void write(final ChannelHandlerContext ctx, Object msg, final ChannelPromise promise) throws Exception {
public void write(final ChannelHandlerContext ctx, Object msg, final ChannelPromise promise) {
if (outClass.isInstance(msg)) {
Outgoing out = new Outgoing(outClass.cast(msg), promise);
receivedOutMessage(ctx);
if (outgoing.isEmpty()) {
outgoing.add(out);
flushNext(ctx);
} else {
outgoing.add(out);
}
} else if (msg instanceof LastHttpContent) {
sendLastHttpContent = false;
ctx.write(msg, promise);
} else {
ctx.write(msg, promise);
}
}
protected void unbufferedWrite(final ChannelHandlerContext ctx, final Outgoing out) {
if (out.message instanceof FullHttpMessage) {
// Forward as is
ctx.writeAndFlush(out.message, out.promise);
out.promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
executeInEventLoop(ctx, new Runnable() {
@Override
public void run() {
sentOutMessage(ctx);
outgoing.remove();
flushNext(ctx);
}
});
}
});
out.promise.addListener((ChannelFutureListener) channelFuture ->
executeInEventLoop(ctx, () -> {
sentOutMessage(ctx);
outgoing.remove();
flushNext(ctx);
}));
} else if (out.message instanceof StreamedHttpMessage) {
StreamedHttpMessage streamed = (StreamedHttpMessage) out.message;
HandlerSubscriber<HttpContent> subscriber = new HandlerSubscriber<HttpContent>(ctx.executor()) {
HandlerSubscriber<HttpContent> subscriber = new HandlerSubscriber<>(ctx.executor()) {
@Override
protected void error(Throwable error) {
out.promise.tryFailure(error);
@ -295,45 +253,26 @@ abstract class HttpStreamsHandler<In extends HttpMessage, Out extends HttpMessag
@Override
protected void complete() {
executeInEventLoop(ctx, new Runnable() {
@Override
public void run() {
completeBody(ctx);
}
});
executeInEventLoop(ctx, () -> completeBody(ctx));
}
};
sendLastHttpContent = true;
// DON'T pass the promise through, create a new promise instead.
ctx.writeAndFlush(out.message);
ctx.pipeline().addAfter(ctx.name(), ctx.name() + "-body-subscriber", subscriber);
subscribeSubscriberToStream(streamed, subscriber);
}
}
private void completeBody(final ChannelHandlerContext ctx) {
removeHandlerIfActive(ctx, ctx.name() + "-body-subscriber");
if (sendLastHttpContent) {
ChannelPromise promise = outgoing.peek().promise;
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT, promise).addListener(
new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
executeInEventLoop(ctx, new Runnable() {
@Override
public void run() {
outgoing.remove();
sentOutMessage(ctx);
flushNext(ctx);
}
});
}
}
(ChannelFutureListener) channelFuture -> executeInEventLoop(ctx, () -> {
outgoing.remove();
sentOutMessage(ctx);
flushNext(ctx);
})
);
} else {
outgoing.remove().promise.setSuccess();
@ -374,7 +313,7 @@ abstract class HttpStreamsHandler<In extends HttpMessage, Out extends HttpMessag
final Out message;
final ChannelPromise promise;
public Outgoing(Out message, ChannelPromise promise) {
Outgoing(Out message, ChannelPromise promise) {
this.message = message;
this.promise = promise;
}

View file

@ -53,7 +53,7 @@ public class HttpStreamsServerHandler extends HttpStreamsHandler<HttpRequest, Ht
private final List<ChannelHandler> dependentHandlers;
public HttpStreamsServerHandler() {
this(Collections.<ChannelHandler>emptyList());
this(Collections.emptyList());
}
/**
@ -89,11 +89,8 @@ public class HttpStreamsServerHandler extends HttpStreamsHandler<HttpRequest, Ht
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// Set to false, since if it was true, and the client is sending data, then the
// client must no longer be expecting it (due to a timeout, for example).
continueExpected = false;
sendContinue = false;
if (msg instanceof HttpRequest) {
HttpRequest request = (HttpRequest) msg;
lastRequest = request;
@ -117,7 +114,6 @@ public class HttpStreamsServerHandler extends HttpStreamsHandler<HttpRequest, Ht
sendContinue = false;
continueExpected = false;
}
if (close) {
ctx.close();
}
@ -125,13 +121,10 @@ public class HttpStreamsServerHandler extends HttpStreamsHandler<HttpRequest, Ht
@Override
protected void unbufferedWrite(ChannelHandlerContext ctx, HttpStreamsHandler<HttpRequest, HttpResponse>.Outgoing out) {
if (out.message instanceof WebSocketHttpResponse) {
if ((lastRequest instanceof FullHttpRequest) || !hasBody(lastRequest)) {
handleWebSocketResponse(ctx, out);
} else {
// If the response has a streamed body, then we can't send the WebSocket response until we've received
// the body.
webSocketResponse = out;
}
} else {
@ -150,8 +143,6 @@ public class HttpStreamsServerHandler extends HttpStreamsHandler<HttpRequest, Ht
close = true;
continueExpected = false;
}
// According to RFC 7230 a server MUST NOT send a Content-Length or a Transfer-Encoding when the status
// code is 1xx or 204, also a status code 304 may not have a Content-Length or Transfer-Encoding set.
if (!HttpUtil.isContentLengthSet(out.message) && !HttpUtil.isTransferEncodingChunked(out.message)
&& canHaveBody(out.message)) {
HttpUtil.setKeepAlive(out.message, false);
@ -163,8 +154,6 @@ public class HttpStreamsServerHandler extends HttpStreamsHandler<HttpRequest, Ht
private boolean canHaveBody(HttpResponse message) {
HttpResponseStatus status = message.status();
// All 1xx (Informational), 204 (No Content), and 304 (Not Modified)
// responses do not include a message body
return !(status == HttpResponseStatus.CONTINUE || status == HttpResponseStatus.SWITCHING_PROTOCOLS ||
status == HttpResponseStatus.PROCESSING || status == HttpResponseStatus.NO_CONTENT ||
status == HttpResponseStatus.NOT_MODIFIED);
@ -181,7 +170,6 @@ public class HttpStreamsServerHandler extends HttpStreamsHandler<HttpRequest, Ht
private void handleWebSocketResponse(ChannelHandlerContext ctx, Outgoing out) {
WebSocketHttpResponse response = (WebSocketHttpResponse) out.message;
WebSocketServerHandshaker handshaker = response.handshakerFactory().newHandshaker(lastRequest);
if (handshaker == null) {
HttpResponse res = new DefaultFullHttpResponse(
HttpVersion.HTTP_1_1,
@ -191,26 +179,16 @@ public class HttpStreamsServerHandler extends HttpStreamsHandler<HttpRequest, Ht
super.unbufferedWrite(ctx, new Outgoing(res, out.promise));
response.subscribe(new CancelledSubscriber<>());
} else {
// First, insert new handlers in the chain after us for handling the websocket
ChannelPipeline pipeline = ctx.pipeline();
HandlerPublisher<WebSocketFrame> publisher = new HandlerPublisher<>(ctx.executor(), WebSocketFrame.class);
HandlerSubscriber<WebSocketFrame> subscriber = new HandlerSubscriber<>(ctx.executor());
pipeline.addAfter(ctx.executor(), ctx.name(), "websocket-subscriber", subscriber);
pipeline.addAfter(ctx.executor(), ctx.name(), "websocket-publisher", publisher);
// Now remove ourselves from the chain
ctx.pipeline().remove(ctx.name());
// Now do the handshake
// Wrap the request in an empty request because we don't need the WebSocket handshaker ignoring the body,
// we already have handled the body.
handshaker.handshake(ctx.channel(), new EmptyHttpRequest(lastRequest));
// And hook up the subscriber/publishers
response.subscribe(subscriber);
publisher.subscribe(response);
}
}
@Override

View file

@ -79,6 +79,7 @@ public class Http1ChannelInitializer extends ChannelInitializer<Channel>
private void configureCleartext(Channel channel) {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("http-server-chunked-write", new ChunkedWriteHandler());
pipeline.addLast("http-server-codec",
new HttpServerCodec(serverConfig.getMaxInitialLineLength(),
serverConfig.getMaxHeadersSize(), serverConfig.getMaxChunkSize()));
@ -96,7 +97,6 @@ public class Http1ChannelInitializer extends ChannelInitializer<Channel>
httpObjectAggregator.setMaxCumulationBufferComponents(serverConfig.getMaxCompositeBufferComponents());
pipeline.addLast("http-server-aggregator", httpObjectAggregator);
pipeline.addLast("http-server-pipelining", new HttpPipeliningHandler(serverConfig.getPipeliningCapacity()));
pipeline.addLast("http-server-chunked-write", new ChunkedWriteHandler());
pipeline.addLast("http-server-handler", new HttpHandler(server));
pipeline.addLast("http-idle-timeout-handler", new IdleTimeoutHandler(serverConfig.getIdleTimeoutMillis()));
}

View file

@ -21,6 +21,7 @@ import io.netty.handler.codec.http2.Http2MultiplexCodec;
import io.netty.handler.codec.http2.Http2MultiplexCodecBuilder;
import io.netty.handler.codec.http2.Http2ServerUpgradeCodec;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.codec.http2.Http2StreamFrameToHttpObjectCodec;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.stream.ChunkedWriteHandler;

View file

@ -1,215 +0,0 @@
package org.xbib.netty.http.server.handler.http2;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.EncoderException;
import io.netty.handler.codec.MessageToMessageCodec;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultLastHttpContent;
import io.netty.handler.codec.http.FullHttpMessage;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpScheme;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
import io.netty.handler.codec.http2.Http2DataFrame;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2HeadersFrame;
import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.handler.codec.http2.Http2StreamFrame;
import io.netty.handler.codec.http2.HttpConversionUtil;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.internal.UnstableApi;
import java.util.List;
/**
* This handler converts from {@link Http2StreamFrame} to {@link HttpObject},
* and back. It can be used as an adapter to make http/2 connections backward-compatible with
* {@link ChannelHandler}s expecting {@link HttpObject}.
*
* For simplicity, it converts to chunked encoding unless the entire stream
* is a single header.
*/
@UnstableApi
@Sharable
public class Http2StreamFrameToHttpObjectCodec extends MessageToMessageCodec<Http2StreamFrame, HttpObject> {
private final boolean isServer;
private final boolean validateHeaders;
private HttpScheme scheme;
public Http2StreamFrameToHttpObjectCodec(final boolean isServer,
final boolean validateHeaders) {
this.isServer = isServer;
this.validateHeaders = validateHeaders;
scheme = HttpScheme.HTTP;
}
public Http2StreamFrameToHttpObjectCodec(final boolean isServer) {
this(isServer, true);
}
@Override
public boolean acceptInboundMessage(Object msg) throws Exception {
return (msg instanceof Http2HeadersFrame) || (msg instanceof Http2DataFrame);
}
@Override
protected void decode(ChannelHandlerContext ctx, Http2StreamFrame frame, List<Object> out) throws Exception {
if (frame instanceof Http2HeadersFrame) {
int id = frame.stream() != null ? frame.stream().id() : -1;
Http2HeadersFrame headersFrame = (Http2HeadersFrame) frame;
Http2Headers headers = headersFrame.headers();
final CharSequence status = headers.status();
// 100-continue response is a special case where Http2HeadersFrame#isEndStream=false
// but we need to decode it as a FullHttpResponse to play nice with HttpObjectAggregator.
if (null != status && HttpResponseStatus.CONTINUE.codeAsText().contentEquals(status)) {
final FullHttpMessage fullMsg = newFullMessage(id, headers, ctx.alloc());
out.add(fullMsg);
return;
}
if (headersFrame.isEndStream()) {
if (headers.method() == null && status == null) {
LastHttpContent last = new DefaultLastHttpContent(Unpooled.EMPTY_BUFFER, validateHeaders);
HttpConversionUtil.addHttp2ToHttpHeaders(id, headers, last.trailingHeaders(),
HttpVersion.HTTP_1_1, true, true);
out.add(last);
} else {
FullHttpMessage full = newFullMessage(id, headers, ctx.alloc());
out.add(full);
}
} else {
HttpMessage req = newMessage(id, headers);
if (!HttpUtil.isContentLengthSet(req)) {
req.headers().add(HttpHeaderNames.TRANSFER_ENCODING, HttpHeaderValues.CHUNKED);
}
out.add(req);
}
} else if (frame instanceof Http2DataFrame) {
Http2DataFrame dataFrame = (Http2DataFrame) frame;
if (dataFrame.isEndStream()) {
out.add(new DefaultLastHttpContent(dataFrame.content().retain(), validateHeaders));
} else {
out.add(new DefaultHttpContent(dataFrame.content().retain()));
}
}
}
private void encodeLastContent(LastHttpContent last, List<Object> out) {
boolean needFiller = !(last instanceof FullHttpMessage) && last.trailingHeaders().isEmpty();
if (last.content().isReadable() || needFiller) {
out.add(new DefaultHttp2DataFrame(last.content().retain(), last.trailingHeaders().isEmpty()));
}
if (!last.trailingHeaders().isEmpty()) {
Http2Headers headers = HttpConversionUtil.toHttp2Headers(last.trailingHeaders(), validateHeaders);
out.add(new DefaultHttp2HeadersFrame(headers, true));
}
}
/**
* Encode from an {@link HttpObject} to an {@link Http2StreamFrame}. This method will
* be called for each written message that can be handled by this encoder.
*
* NOTE: 100-Continue responses that are NOT {@link FullHttpResponse} will be rejected.
*
* @param ctx the {@link ChannelHandlerContext} which this handler belongs to
* @param obj the {@link HttpObject} message to encode
* @param out the {@link List} into which the encoded msg should be added
* needs to do some kind of aggregation
* @throws Exception is thrown if an error occurs
*/
@Override
protected void encode(ChannelHandlerContext ctx, HttpObject obj, List<Object> out) throws Exception {
// 100-continue is typically a FullHttpResponse, but the decoded
// Http2HeadersFrame should not be marked as endStream=true
if (obj instanceof HttpResponse) {
final HttpResponse res = (HttpResponse) obj;
if (res.status().equals(HttpResponseStatus.CONTINUE)) {
if (res instanceof FullHttpResponse) {
final Http2Headers headers = toHttp2Headers(res);
out.add(new DefaultHttp2HeadersFrame(headers, false));
return;
} else {
throw new EncoderException(
HttpResponseStatus.CONTINUE.toString() + " must be a FullHttpResponse");
}
}
}
if (obj instanceof HttpMessage) {
Http2Headers headers = toHttp2Headers((HttpMessage) obj);
boolean noMoreFrames = false;
if (obj instanceof FullHttpMessage) {
FullHttpMessage full = (FullHttpMessage) obj;
noMoreFrames = !full.content().isReadable() && full.trailingHeaders().isEmpty();
}
out.add(new DefaultHttp2HeadersFrame(headers, noMoreFrames));
}
if (obj instanceof LastHttpContent) {
LastHttpContent last = (LastHttpContent) obj;
encodeLastContent(last, out);
} else if (obj instanceof HttpContent) {
HttpContent cont = (HttpContent) obj;
out.add(new DefaultHttp2DataFrame(cont.content().retain(), false));
}
}
private Http2Headers toHttp2Headers(final HttpMessage msg) {
if (msg instanceof HttpRequest) {
msg.headers().set(
HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(),
scheme.name());
}
return HttpConversionUtil.toHttp2Headers(msg, validateHeaders);
}
private HttpMessage newMessage(final int id,
final Http2Headers headers) throws Http2Exception {
return isServer ?
HttpConversionUtil.toHttpRequest(id, headers, validateHeaders) :
HttpConversionUtil.toHttpResponse(id, headers, validateHeaders);
}
private FullHttpMessage newFullMessage(final int id,
final Http2Headers headers,
final ByteBufAllocator alloc) throws Http2Exception {
return isServer ?
HttpConversionUtil.toFullHttpRequest(id, headers, alloc, validateHeaders) :
HttpConversionUtil.toFullHttpResponse(id, headers, alloc, validateHeaders);
}
@Override
public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
super.handlerAdded(ctx);
// This handler is typically used on an Http2StreamChannel. At this
// stage, ssl handshake should've been established. checking for the
// presence of SslHandler in the parent's channel pipeline to
// determine the HTTP scheme should suffice, even for the case where
// SniHandler is used.
scheme = isSsl(ctx) ? HttpScheme.HTTPS : HttpScheme.HTTP;
}
protected boolean isSsl(final ChannelHandlerContext ctx) {
final Channel ch = ctx.channel();
final Channel connChannel = (ch instanceof Http2StreamChannel) ? ch.parent() : ch;
return null != connChannel.pipeline().get(SslHandler.class);
}
}

View file

@ -66,8 +66,10 @@ public class HttpServerRequest implements ServerRequest {
}
void handleParameters() {
if (logger.isLoggable(Level.FINER)) {
logger.log(Level.FINER, () -> "request = " + httpRequest);
}
Charset charset = HttpUtil.getCharset(httpRequest, StandardCharsets.UTF_8);
HttpParameters httpParameters = new HttpParameters();
this.url = URL.builder()
.charset(charset, CodingErrorAction.REPLACE)
.path(httpRequest.uri()) // creates path, query params, fragment
@ -76,31 +78,28 @@ public class HttpServerRequest implements ServerRequest {
CharSequence mimeType = HttpUtil.getMimeType(httpRequest);
ByteBuf byteBuf = httpRequest.content();
if (logger.isLoggable(Level.FINER)) {
logger.log(Level.FINER, "url = " + url +
logger.log(Level.FINER, () -> "url = " + url +
" charset = " + charset +
" mime type = " + mimeType +
" queryParameters = " + queryParameters +
" body exists = " + (byteBuf != null));
}
if (byteBuf != null) {
if (httpRequest.method().equals(HttpMethod.POST) && mimeType != null) {
if (httpRequest.method().equals(HttpMethod.POST)) {
String params;
// https://www.w3.org/TR/html4/interact/forms.html#h-17.13.4
if (HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED.toString().equals(mimeType.toString())) {
if (mimeType != null && HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED.toString().equals(mimeType.toString())) {
Charset htmlCharset = HttpUtil.getCharset(httpRequest, StandardCharsets.ISO_8859_1);
params = byteBuf.toString(htmlCharset).replace('+', ' ');
if (logger.isLoggable(Level.FINER)) {
logger.log(Level.FINER, "html form, charset = " + htmlCharset + " param body = " + params);
}
} else {
params = byteBuf.toString(charset);
if (logger.isLoggable(Level.FINER)) {
logger.log(Level.FINER, "not a html form, charset = " + charset + " param body = " + params);
}
queryParameters.addPercentEncodedBody(params);
queryParameters.add("_raw", params);
}
queryParameters.addPercentEncodedBody(params);
queryParameters.add("_body", params);
}
}
HttpParameters httpParameters = new HttpParameters();
for (Pair<String, String> pair : queryParameters) {
httpParameters.add(pair.getFirst(), pair.getSecond());
}

View file

@ -23,6 +23,7 @@ public class NettyHttpTestExtension implements BeforeAllCallback {
System.setProperty("io.netty.noUnsafe", Boolean.toString(true));
//System.setProperty("io.netty.leakDetection.level", "paranoid");
Level level = Level.INFO;
//Level level = Level.ALL;
System.setProperty("java.util.logging.SimpleFormatter.format",
"%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS.%1$tL %4$-7s [%3$s] %5$s %6$s%n");
LogManager.getLogManager().reset();

View file

@ -19,6 +19,7 @@ import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.QueryStringDecoder;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.ExtendWith;
@ -48,6 +49,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Disabled /* flaky */
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@ExtendWith(NettyHttpTestExtension.class)
class HttpPipeliningHandlerTest {
@ -166,7 +168,7 @@ class HttpPipeliningHandlerTest {
}
}
private class WorkEmulatorHandler extends SimpleChannelInboundHandler<HttpPipelinedRequest> {
private static class WorkEmulatorHandler extends SimpleChannelInboundHandler<HttpPipelinedRequest> {
private final ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

View file

@ -0,0 +1,83 @@
package org.xbib.netty.http.server.test.http1;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.multipart.MixedFileUpload;
import static org.junit.jupiter.api.Assertions.assertTrue;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.netty.http.client.Client;
import org.xbib.netty.http.client.api.Request;
import org.xbib.netty.http.client.api.ResponseListener;
import org.xbib.netty.http.common.HttpAddress;
import org.xbib.netty.http.common.HttpParameters;
import org.xbib.netty.http.common.HttpResponse;
import org.xbib.netty.http.server.Domain;
import org.xbib.netty.http.server.Server;
import org.xbib.netty.http.server.api.ServerResponse;
import org.xbib.netty.http.server.test.NettyHttpTestExtension;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
@ExtendWith(NettyHttpTestExtension.class)
class MimeUploadTest {
private static final Logger logger = Logger.getLogger(MimeUploadTest.class.getName());
@Test
void testMimetHttp1() throws Exception {
final AtomicBoolean success1 = new AtomicBoolean(false);
HttpAddress httpAddress = HttpAddress.http1("localhost", 8008);
Domain domain = Domain.builder(httpAddress)
.singleEndpoint("/upload", "/**", (req, resp) -> {
HttpParameters parameters = req.getParameters();
logger.log(Level.INFO, "got request, headers = " + req.getHeaders() +
" params = " + parameters.toString() +
" body = " + req.getContent().toString(StandardCharsets.UTF_8));
ServerResponse.write(resp, HttpResponseStatus.OK);
}, "POST")
.build();
Server server = Server.builder(domain)
.build();
Client client = Client.builder()
.enableDebug()
.build();
try {
server.accept();
ByteBuf byteBuf = Unpooled.buffer();
ByteBufOutputStream outputStream = new ByteBufOutputStream(byteBuf);
int max = 10 * 1024;
for (int i = 0; i < max; i++) {
outputStream.writeBytes("Hi");
}
MixedFileUpload upload = new MixedFileUpload("Test upload",
"test.txt", "text/plain", "binary",
StandardCharsets.UTF_8, byteBuf.readableBytes(), 10 * 1024);
upload.setContent(byteBuf);
ResponseListener<HttpResponse> responseListener = (resp) -> {
if (resp.getStatus().getCode() == HttpResponseStatus.OK.code()) {
success1.set(true);
}
};
Request postRequest = Request.post()
.setVersion(HttpVersion.HTTP_1_1)
.url(server.getServerConfig().getAddress().base().resolve("/upload"))
.addBodyData(upload)
.setResponseListener(responseListener)
.build();
client.execute(postRequest).get();
} finally {
server.shutdownGracefully();
client.shutdownGracefully();
logger.log(Level.INFO, "server and client shut down");
}
assertTrue(success1.get());
}
}

View file

@ -241,4 +241,53 @@ class PostTest {
assertTrue(success3.get());
assertTrue(success4.get());
}
@Test
void testPostInvalidPercentEncodings() throws Exception {
final AtomicBoolean success1 = new AtomicBoolean(false);
final AtomicBoolean success2 = new AtomicBoolean(false);
final AtomicBoolean success3 = new AtomicBoolean(false);
HttpAddress httpAddress = HttpAddress.http1("localhost", 8008);
Domain domain = Domain.builder(httpAddress)
.singleEndpoint("/post", "/**", (req, resp) -> {
HttpParameters parameters = req.getParameters();
logger.log(Level.INFO, "got request " + parameters.toString() + ", sending OK");
if ("myÿvalue".equals(parameters.getFirst("my param"))) {
success1.set(true);
}
if ("b%YYc".equals(parameters.getFirst("a"))) {
success2.set(true);
}
ServerResponse.write(resp, HttpResponseStatus.OK);
}, "POST")
.build();
Server server = Server.builder(domain)
.build();
Client client = Client.builder()
.build();
try {
server.accept();
ResponseListener<HttpResponse> responseListener = (resp) -> {
if (resp.getStatus().getCode() == HttpResponseStatus.OK.code()) {
success3.set(true);
}
};
Request postRequest = Request.post().setVersion(HttpVersion.HTTP_1_1)
.url(server.getServerConfig().getAddress().base().resolve("/post/test.txt"))
.contentType(HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED, StandardCharsets.ISO_8859_1)
.addRawParameter("a", "b%YYc")
.addRawFormParameter("my param", "my%ZZvalue")
.setResponseListener(responseListener)
.build();
client.execute(postRequest).get();
} finally {
server.shutdownGracefully();
client.shutdownGracefully();
logger.log(Level.INFO, "server and client shut down");
}
assertTrue(success1.get());
assertTrue(success2.get());
assertTrue(success3.get());
}
}