update to Netty 4.1.42, fix parameter percent encoding

This commit is contained in:
Jörg Prante 2019-10-01 14:48:47 +02:00
parent 53ab059bb3
commit cb2b6a23df
15 changed files with 136 additions and 226 deletions

View file

@ -1,9 +1,9 @@
group = org.xbib group = org.xbib
name = netty-http name = netty-http
version = 4.1.41.2 version = 4.1.42.0
# netty # netty
netty.version = 4.1.41.Final netty.version = 4.1.42.Final
tcnative.version = 2.0.25.Final tcnative.version = 2.0.25.Final
# for netty-http-common # for netty-http-common

View file

@ -11,8 +11,6 @@ import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpUtil; import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.codec.http.QueryStringEncoder;
import io.netty.handler.codec.http.multipart.InterfaceHttpData; import io.netty.handler.codec.http.multipart.InterfaceHttpData;
import io.netty.handler.codec.http2.HttpConversionUtil; import io.netty.handler.codec.http2.HttpConversionUtil;
import io.netty.util.AsciiString; import io.netty.util.AsciiString;
@ -35,7 +33,6 @@ import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
@ -46,8 +43,6 @@ public final class Request {
private final URL url; private final URL url;
private final String uri;
private final HttpVersion httpVersion; private final HttpVersion httpVersion;
private final HttpMethod httpMethod; private final HttpMethod httpMethod;
@ -76,12 +71,11 @@ public final class Request {
private ResponseListener<HttpResponse> responseListener; private ResponseListener<HttpResponse> responseListener;
private Request(URL url, String uri, HttpVersion httpVersion, HttpMethod httpMethod, private Request(URL url, HttpVersion httpVersion, HttpMethod httpMethod,
HttpHeaders headers, Collection<Cookie> cookies, ByteBuf content, List<InterfaceHttpData> bodyData, HttpHeaders headers, Collection<Cookie> cookies, ByteBuf content, List<InterfaceHttpData> bodyData,
long timeoutInMillis, boolean followRedirect, int maxRedirect, int redirectCount, long timeoutInMillis, boolean followRedirect, int maxRedirect, int redirectCount,
boolean isBackOff, BackOff backOff, ResponseListener<HttpResponse> responseListener) { boolean isBackOff, BackOff backOff, ResponseListener<HttpResponse> responseListener) {
this.url = url; this.url = url;
this.uri = uri;
this.httpVersion = httpVersion; this.httpVersion = httpVersion;
this.httpMethod = httpMethod; this.httpMethod = httpMethod;
this.headers = headers; this.headers = headers;
@ -106,8 +100,7 @@ public final class Request {
} }
public String relative() { public String relative() {
// is already in external form return url.relativeReference();
return uri;
} }
public HttpVersion httpVersion() { public HttpVersion httpVersion() {
@ -135,7 +128,8 @@ public final class Request {
} }
/** /**
* Return the timeout in milliseconds per request. This overrides the read timeout of the client. * Return the timeout in milliseconds per request.
* This overrides the read timeout of the client.
* @return timeout timeout in milliseconds * @return timeout timeout in milliseconds
*/ */
public long getTimeoutInMillis() { public long getTimeoutInMillis() {
@ -248,7 +242,7 @@ public final class Request {
public static Builder builder(HttpMethod httpMethod, Request request) { public static Builder builder(HttpMethod httpMethod, Request request) {
return builder(PooledByteBufAllocator.DEFAULT, httpMethod) return builder(PooledByteBufAllocator.DEFAULT, httpMethod)
.setVersion(request.httpVersion) .setVersion(request.httpVersion)
.uri(request.uri) .url(request.url)
.setHeaders(request.headers) .setHeaders(request.headers)
.content(request.content) .content(request.content)
.setResponseListener(request.responseListener); .setResponseListener(request.responseListener);
@ -304,8 +298,6 @@ public final class Request {
private URL url; private URL url;
private String uri;
private CharSequence contentType; private CharSequence contentType;
private HttpParameters uriParameters; private HttpParameters uriParameters;
@ -342,11 +334,17 @@ public final class Request {
this.headers = new DefaultHttpHeaders(); this.headers = new DefaultHttpHeaders();
this.removeHeaders = new ArrayList<>(); this.removeHeaders = new ArrayList<>();
this.cookies = new HashSet<>(); this.cookies = new HashSet<>();
this.uriParameters = new HttpParameters();
this.bodyData = new ArrayList<>(); this.bodyData = new ArrayList<>();
charset(StandardCharsets.UTF_8); charset(StandardCharsets.UTF_8);
} }
public Builder charset(Charset charset) {
this.encoder = PercentEncoders.getQueryEncoder(charset);
this.formParameters = new HttpParameters(DEFAULT_FORM_CONTENT_TYPE);
this.uriParameters = new HttpParameters(DEFAULT_FORM_CONTENT_TYPE);
return this;
}
public Builder setMethod(HttpMethod httpMethod) { public Builder setMethod(HttpMethod httpMethod) {
this.httpMethod = httpMethod; this.httpMethod = httpMethod;
return this; return this;
@ -396,11 +394,6 @@ public final class Request {
return this; return this;
} }
public Builder uri(String uri) {
this.uri = uri;
return this;
}
public Builder setHeaders(HttpHeaders headers) { public Builder setHeaders(HttpHeaders headers) {
this.headers = headers; this.headers = headers;
return this; return this;
@ -421,12 +414,6 @@ public final class Request {
return this; return this;
} }
public Builder charset(Charset charset) {
this.encoder = PercentEncoders.getQueryEncoder(charset);
this.formParameters = new HttpParameters(DEFAULT_FORM_CONTENT_TYPE);
return this;
}
public Builder contentType(CharSequence contentType) { public Builder contentType(CharSequence contentType) {
Objects.requireNonNull(contentType); Objects.requireNonNull(contentType);
this.contentType = contentType; this.contentType = contentType;
@ -446,28 +433,28 @@ public final class Request {
public Builder addParameter(String name, String value) { public Builder addParameter(String name, String value) {
Objects.requireNonNull(name); Objects.requireNonNull(name);
Objects.requireNonNull(value); Objects.requireNonNull(value);
uriParameters.add(encode(contentType, name), encode(contentType, value)); uriParameters.addRaw(encode(contentType, name), encode(contentType, value));
return this; return this;
} }
public Builder addRawParameter(String name, String value) { public Builder addRawParameter(String name, String value) {
Objects.requireNonNull(name); Objects.requireNonNull(name);
Objects.requireNonNull(value); Objects.requireNonNull(value);
uriParameters.add(name, value); uriParameters.addRaw(name, value);
return this; return this;
} }
public Builder addFormParameter(String name, String value) { public Builder addFormParameter(String name, String value) {
Objects.requireNonNull(name); Objects.requireNonNull(name);
Objects.requireNonNull(value); Objects.requireNonNull(value);
formParameters.add(encode(contentType, name), encode(contentType, value)); formParameters.addRaw(encode(contentType, name), encode(contentType, value));
return this; return this;
} }
public Builder addRawFormParameter(String name, String value) { public Builder addRawFormParameter(String name, String value) {
Objects.requireNonNull(name); Objects.requireNonNull(name);
Objects.requireNonNull(value); Objects.requireNonNull(value);
formParameters.add(name, value); formParameters.addRaw(name, value);
return this; return this;
} }
@ -488,7 +475,7 @@ public final class Request {
} }
return encodedValue; return encodedValue;
} catch (MalformedInputException | UnmappableCharacterException e) { } catch (MalformedInputException | UnmappableCharacterException e) {
// should never be reached // should never be reached because encoder does not bail out on error
throw new IllegalArgumentException(e); throw new IllegalArgumentException(e);
} }
} }
@ -591,33 +578,16 @@ public final class Request {
public Request build() { public Request build() {
DefaultHttpHeaders validatedHeaders = new DefaultHttpHeaders(true); DefaultHttpHeaders validatedHeaders = new DefaultHttpHeaders(true);
validatedHeaders.set(headers);
if (url != null) { if (url != null) {
// attach user query parameters to URL // add our URI parameters to the URL
URL.Builder mutator = url.mutator(); URL.Builder mutator = url.mutator();
uriParameters.forEach((k, v) -> v.forEach(value -> mutator.queryParam(k, value))); uriParameters.forEach((k, v) -> v.forEach(vv -> {
// no percent encoding
mutator.queryParam(k, vv);
}));
// calling build() performs percent encoding
url = mutator.build(); url = mutator.build();
// let Netty's query string decoder/encoder work over the URL to add parameters given implicitly in url()
String path = url.getPath();
String query = url.getQuery();
QueryStringDecoder queryStringDecoder = new QueryStringDecoder(query != null ? path + "?" + query : path, StandardCharsets.UTF_8);
QueryStringEncoder queryStringEncoder = new QueryStringEncoder(queryStringDecoder.path());
for (Map.Entry<String, List<String>> entry : queryStringDecoder.parameters().entrySet()) {
for (String value : entry.getValue()) {
queryStringEncoder.addParam(entry.getKey(), value);
}
}
// build uri from QueryStringDecoder
String pathAndQuery = queryStringEncoder.toString();
StringBuilder sb = new StringBuilder();
if (!pathAndQuery.isEmpty()) {
sb.append(pathAndQuery);
}
String fragment = url.getFragment();
if (fragment != null && !fragment.isEmpty()) {
sb.append('#').append(fragment);
}
this.uri = sb.toString(); // the encoded form of path/query/fragment
validatedHeaders.set(headers);
String scheme = url.getScheme(); String scheme = url.getScheme();
if (httpVersion.majorVersion() == 2) { if (httpVersion.majorVersion() == 2) {
validatedHeaders.set(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), scheme); validatedHeaders.set(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), scheme);
@ -631,10 +601,9 @@ public final class Request {
if (gzip) { if (gzip) {
validatedHeaders.set(HttpHeaderNames.ACCEPT_ENCODING, "gzip"); validatedHeaders.set(HttpHeaderNames.ACCEPT_ENCODING, "gzip");
} }
// form parameters
if (!formParameters.isEmpty()) { if (!formParameters.isEmpty()) {
try { try {
// formParameters is already percent encoded // form parameters are already percent encoded
content(formParameters.getAsQueryString(false), formParameters.getContentType()); content(formParameters.getAsQueryString(false), formParameters.getContentType());
} catch (MalformedInputException | UnmappableCharacterException e) { } catch (MalformedInputException | UnmappableCharacterException e) {
throw new IllegalArgumentException(); throw new IllegalArgumentException();
@ -661,7 +630,7 @@ public final class Request {
for (String headerName : removeHeaders) { for (String headerName : removeHeaders) {
validatedHeaders.remove(headerName); validatedHeaders.remove(headerName);
} }
return new Request(url, uri, httpVersion, httpMethod, validatedHeaders, cookies, content, bodyData, return new Request(url, httpVersion, httpMethod, validatedHeaders, cookies, content, bodyData,
timeoutInMillis, followRedirect, maxRedirects, 0, enableBackOff, backOff, timeoutInMillis, followRedirect, maxRedirects, 0, enableBackOff, backOff,
responseListener); responseListener);
} }

View file

@ -51,4 +51,6 @@ public interface Transport extends AutoCloseable {
SSLSession getSession(); SSLSession getSession();
void close() throws IOException;
} }

View file

@ -116,7 +116,9 @@ public final class Client implements AutoCloseable {
this.protocolProviders = new ArrayList<>(); this.protocolProviders = new ArrayList<>();
for (ProtocolProvider<HttpChannelInitializer, Transport> provider : ServiceLoader.load(ProtocolProvider.class)) { for (ProtocolProvider<HttpChannelInitializer, Transport> provider : ServiceLoader.load(ProtocolProvider.class)) {
protocolProviders.add(provider); protocolProviders.add(provider);
logger.log(Level.INFO, "protocol provider up: " + provider.transportClass() ); if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "protocol provider up: " + provider.transportClass());
}
} }
initializeTrustManagerFactory(clientConfig); initializeTrustManagerFactory(clientConfig);
this.byteBufAllocator = byteBufAllocator != null ? this.byteBufAllocator = byteBufAllocator != null ?

View file

@ -106,7 +106,8 @@ public class Http1Transport extends BaseTransport {
Request request; Request request;
DefaultHttpResponse httpResponse = null; DefaultHttpResponse httpResponse = null;
try { try {
// streamID is expected to be null, last request on memory is expected to be current, remove request from memory // streamID is expected to be null, last request on memory
// is expected to be current, remove request from memory
request = requests.get(requestKey); request = requests.get(requestKey);
if (request != null) { if (request != null) {
for (String cookieString : fullHttpResponse.headers().getAll(HttpHeaderNames.SET_COOKIE)) { for (String cookieString : fullHttpResponse.headers().getAll(HttpHeaderNames.SET_COOKIE)) {
@ -128,7 +129,8 @@ public class Http1Transport extends BaseTransport {
} else { } else {
Request continueRequest = continuation(request, httpResponse); Request continueRequest = continuation(request, httpResponse);
if (continueRequest != null) { if (continueRequest != null) {
// continue with new transport, synchronous call here, wait for completion // continue with new transport, synchronous call here,
// wait for completion
client.continuation(this, continueRequest); client.continuation(this, continueRequest);
} }
} }
@ -166,7 +168,8 @@ public class Http1Transport extends BaseTransport {
} }
@Override @Override
public void pushPromiseReceived(Channel channel, Integer streamId, Integer promisedStreamId, Http2Headers headers) { public void pushPromiseReceived(Channel channel, Integer streamId,
Integer promisedStreamId, Http2Headers headers) {
} }
@Override @Override

View file

@ -51,18 +51,21 @@ public class HttpParameters implements Map<String, SortedSet<String>> {
private final CharSequence contentType; private final CharSequence contentType;
private final Charset charset;
public HttpParameters() { public HttpParameters() {
this(1024, 1024, 65536, this(1024, 1024, 65536,
HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED, StandardCharsets.UTF_8); HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED, StandardCharsets.UTF_8);
} }
public HttpParameters(String contentType) { public HttpParameters(CharSequence contentType) {
this(1024, 1024, 65536, this(1024, 1024, 65536,
contentType, StandardCharsets.UTF_8); contentType, StandardCharsets.UTF_8);
} }
public HttpParameters(CharSequence contentType, Charset charset) {
this(1024, 1024, 65536,
contentType, charset);
}
public HttpParameters(int maxParam, int sizeLimit, int elementSizeLimit, public HttpParameters(int maxParam, int sizeLimit, int elementSizeLimit,
CharSequence contentType, Charset charset) { CharSequence contentType, Charset charset) {
this.maxParam = maxParam; this.maxParam = maxParam;
@ -72,7 +75,6 @@ public class HttpParameters implements Map<String, SortedSet<String>> {
this.percentEncoder = PercentEncoders.getQueryEncoder(charset); this.percentEncoder = PercentEncoders.getQueryEncoder(charset);
this.percentDecoder = new PercentDecoder(); this.percentDecoder = new PercentDecoder();
this.contentType = contentType; this.contentType = contentType;
this.charset = charset;
} }
@Override @Override
@ -150,7 +152,7 @@ public class HttpParameters implements Map<String, SortedSet<String>> {
if (percentEncode) { if (percentEncode) {
remove(key); remove(key);
for (String v : values) { for (String v : values) {
add(key, v, true); add(key, v, percentEncode);
} }
return get(key); return get(key);
} else { } else {
@ -165,10 +167,14 @@ public class HttpParameters implements Map<String, SortedSet<String>> {
* @param value the parameter value * @param value the parameter value
* @return the value * @return the value
*/ */
public String add(String key, String value) { public String addRaw(String key, String value) {
return add(key, value, false); return add(key, value, false);
} }
public String add(String key, String value) {
return add(key, value, true);
}
/** /**
* Convenience method to add a single value for the parameter specified by * Convenience method to add a single value for the parameter specified by
* 'key'. * 'key'.
@ -179,7 +185,7 @@ public class HttpParameters implements Map<String, SortedSet<String>> {
* inserted into the map * inserted into the map
* @return the value * @return the value
*/ */
public String add(String key, String value, boolean percentEncode) { private String add(String key, String value, boolean percentEncode) {
String v = null; String v = null;
try { try {
String k = percentEncode ? percentEncoder.encode(key) : key; String k = percentEncode ? percentEncoder.encode(key) : key;
@ -208,11 +214,16 @@ public class HttpParameters implements Map<String, SortedSet<String>> {
* @return null * @return null
*/ */
public String addNull(String key, String nullString) { public String addNull(String key, String nullString) {
return add(key, nullString); return addRaw(key, nullString);
} }
public void addAll(Map<? extends String, ? extends SortedSet<String>> m, boolean percentEncode) public void addAll(String[] keyValuePairs, boolean percentEncode) {
throws MalformedInputException, UnmappableCharacterException { for (int i = 0; i < keyValuePairs.length - 1; i += 2) {
add(keyValuePairs[i], keyValuePairs[i + 1], percentEncode);
}
}
public void addAll(Map<? extends String, ? extends SortedSet<String>> m, boolean percentEncode) {
if (percentEncode) { if (percentEncode) {
for (String key : m.keySet()) { for (String key : m.keySet()) {
put(key, m.get(key), true); put(key, m.get(key), true);
@ -222,12 +233,6 @@ public class HttpParameters implements Map<String, SortedSet<String>> {
} }
} }
public void addAll(String[] keyValuePairs, boolean percentEncode) {
for (int i = 0; i < keyValuePairs.length - 1; i += 2) {
add(keyValuePairs[i], keyValuePairs[i + 1], percentEncode);
}
}
/** /**
* Convenience method to merge a {@code Map<String, List<String>>}. * Convenience method to merge a {@code Map<String, List<String>>}.
* *

View file

@ -30,7 +30,7 @@ public class MalvaMimeMultipartParser implements MimeMultipartParser {
this.type = pos >= 0 ? contentType.substring(0, pos) : contentType; this.type = pos >= 0 ? contentType.substring(0, pos) : contentType;
this.type = type.trim().toLowerCase(); this.type = type.trim().toLowerCase();
this.subType = type.startsWith("multipart") ? type.substring(10).trim() : null; this.subType = type.startsWith("multipart") ? type.substring(10).trim() : null;
Map m = parseHeaderLine(contentType); Map<String, String> m = parseHeaderLine(contentType);
this.boundary = m.containsKey("boundary") ? m.get("boundary").toString().getBytes(StandardCharsets.US_ASCII) : null; this.boundary = m.containsKey("boundary") ? m.get("boundary").toString().getBytes(StandardCharsets.US_ASCII) : null;
} }
} }

View file

@ -5,7 +5,7 @@ import java.util.Map;
public interface MimeMultipart { public interface MimeMultipart {
Map headers(); Map<String, String> headers();
ByteBuf body(); ByteBuf body();

View file

@ -13,7 +13,7 @@ class HttpParametersTest {
@Test @Test
void testParameters() throws MalformedInputException, UnmappableCharacterException { void testParameters() throws MalformedInputException, UnmappableCharacterException {
HttpParameters httpParameters = new HttpParameters(); HttpParameters httpParameters = new HttpParameters();
httpParameters.add("a", "b"); httpParameters.addRaw("a", "b");
String query = httpParameters.getAsQueryString(false); String query = httpParameters.getAsQueryString(false);
assertEquals("a=b", query); assertEquals("a=b", query);
} }
@ -21,7 +21,7 @@ class HttpParametersTest {
@Test @Test
void testUtf8() throws MalformedInputException, UnmappableCharacterException { void testUtf8() throws MalformedInputException, UnmappableCharacterException {
HttpParameters httpParameters = new HttpParameters("text/plain; charset=utf-8"); HttpParameters httpParameters = new HttpParameters("text/plain; charset=utf-8");
httpParameters.add("Hello", "Jörg"); httpParameters.addRaw("Hello", "Jörg");
String query = httpParameters.getAsQueryString(false); String query = httpParameters.getAsQueryString(false);
assertEquals("Hello=Jörg", query); assertEquals("Hello=Jörg", query);
} }

View file

@ -20,21 +20,13 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.embedded.EmbeddedChannel; import io.netty.channel.embedded.EmbeddedChannel;
import io.reactivex.netty.client.pool.PooledConnection; import io.reactivex.netty.client.pool.PooledConnection;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import rx.Observable; import rx.Observable;
import rx.functions.Func0; import rx.functions.Func0;
import rx.functions.Func1; import rx.functions.Func1;
import rx.observers.AssertableSubscriber; import rx.observers.AssertableSubscriber;
import org.junit.Test;
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.observers.AssertableSubscriber;
import java.util.ArrayList;
import java.util.List;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -44,15 +36,12 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static rx.Observable.fromCallable; import static rx.Observable.fromCallable;
import static rx.Observable.just; import static rx.Observable.just;
import static org.hamcrest.MatcherAssert.*;
import static org.hamcrest.Matchers.*;
import static rx.Observable.fromCallable;
import static rx.Observable.just;
/** /**
* This tests the code paths which are not invoked for {@link EmbeddedChannel} as it does not schedule any task * This tests the code paths which are not invoked for {@link EmbeddedChannel} as it does not schedule any task
* (an EmbeddedChannelEventLopp never returns false for isInEventLoop()) * (an EmbeddedChannelEventLopp never returns false for isInEventLoop())
*/ */
@Ignore
public class PoolingWithRealChannelTest { public class PoolingWithRealChannelTest {
@Rule @Rule
@ -67,80 +56,50 @@ public class PoolingWithRealChannelTest {
clientRule.startServer(1); clientRule.startServer(1);
PooledConnection<ByteBuf, ByteBuf> connection = clientRule.connect(); PooledConnection<ByteBuf, ByteBuf> connection = clientRule.connect();
connection.closeNow(); connection.closeNow();
assertThat("Pooled connection is closed.", connection.unsafeNettyChannel().isOpen(), is(true)); assertThat("Pooled connection is closed.", connection.unsafeNettyChannel().isOpen(), is(true));
PooledConnection<ByteBuf, ByteBuf> connection2 = clientRule.connect(); PooledConnection<ByteBuf, ByteBuf> connection2 = clientRule.connect();
assertThat("Connection is not reused.", connection2, is(connection)); assertThat("Connection is not reused.", connection2, is(connection));
} }
@Test
/** /**
*
* Load test to prove concurrency issues mainly seen on heavy load. * Load test to prove concurrency issues mainly seen on heavy load.
*
*/ */
@Test
public void testLoad() { public void testLoad() {
clientRule.startServer(1000); clientRule.startServer(1000);
MockTcpClientEventListener listener = new MockTcpClientEventListener(); MockTcpClientEventListener listener = new MockTcpClientEventListener();
clientRule.getClient().subscribe(listener); clientRule.getClient().subscribe(listener);
int number_of_iterations = 10; // 300
int numberOfRequests = 2; // 10
int number_of_iterations = 300;
int numberOfRequests = 10;
for(int j = 0; j < number_of_iterations; j++) { for(int j = 0; j < number_of_iterations; j++) {
List<Observable<String>> results = new ArrayList<>(); List<Observable<String>> results = new ArrayList<>();
//Just giving the client some time to recover
try { try {
Thread.sleep(100); Thread.sleep(100);
} catch (InterruptedException e) { } catch (InterruptedException e) {
e.printStackTrace(); e.printStackTrace();
} }
for (int i = 0; i < numberOfRequests; i++) { for (int i = 0; i < numberOfRequests; i++) {
results.add( results.add(
fromCallable(new Func0<PooledConnection<ByteBuf, ByteBuf>>() { fromCallable((Func0<PooledConnection<ByteBuf, ByteBuf>>) clientRule::connectWithCheck)
@Override .flatMap((Func1<PooledConnection<ByteBuf, ByteBuf>, Observable<String>>) connection ->
public PooledConnection<ByteBuf, ByteBuf> call() { connection.writeStringAndFlushOnEach(just("Hello"))
return clientRule.connectWithCheck(); .toCompletable()
} .<ByteBuf>toObservable()
}) .concatWith(connection.getInput())
.flatMap(new Func1<PooledConnection<ByteBuf, ByteBuf>, Observable<String>>() { .take(1)
@Override .single()
public Observable<String> call(PooledConnection<ByteBuf, ByteBuf> connection) { .map(byteBuf -> {
return connection.writeStringAndFlushOnEach(just("Hello")) try {
.toCompletable() byte[] bytes = new byte[byteBuf.readableBytes()];
.<ByteBuf>toObservable() byteBuf.readBytes(bytes);
.concatWith(connection.getInput()) return new String(bytes);
.take(1) } finally {
.single() byteBuf.release();
.map(new Func1<ByteBuf, String>() { }
@Override }).doOnError(throwable -> {
public String call(ByteBuf byteBuf) { Assert.fail("Did not expect exception: " + throwable.getMessage());
try { throwable.printStackTrace();
})));
byte[] bytes = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(bytes);
String result = new String(bytes);
return result;
} finally {
byteBuf.release();
}
}
}).doOnError(new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Assert.fail("Did not expect exception: " + throwable.getMessage());
throwable.printStackTrace();
}
});
}
}));
} }
AssertableSubscriber<String> test = Observable.merge(results).test(); AssertableSubscriber<String> test = Observable.merge(results).test();
test.awaitTerminalEvent(); test.awaitTerminalEvent();
@ -148,26 +107,22 @@ public class PoolingWithRealChannelTest {
} }
} }
@Test
/** /**
* *
* Load test to prove concurrency issues mainly seen on heavy load. * Load test to prove concurrency issues mainly seen on heavy load.
* *
*/ */
@Test
public void assertPermitsAreReleasedWhenMergingObservablesWithExceptions() { public void assertPermitsAreReleasedWhenMergingObservablesWithExceptions() {
clientRule.startServer(10, true); clientRule.startServer(10, true);
MockTcpClientEventListener listener = new MockTcpClientEventListener(); MockTcpClientEventListener listener = new MockTcpClientEventListener();
clientRule.getClient().subscribe(listener); clientRule.getClient().subscribe(listener);
int number_of_iterations = 1; int number_of_iterations = 1;
int numberOfRequests = 3; int numberOfRequests = 3;
makeRequests(number_of_iterations, numberOfRequests); makeRequests(number_of_iterations, numberOfRequests);
sleep(clientRule.getPoolConfig().getMaxIdleTimeMillis()); sleep(clientRule.getPoolConfig().getMaxIdleTimeMillis());
assertThat("Permits should be 10",
assertThat("Permits should be 10", clientRule.getPoolConfig().getPoolLimitDeterminationStrategy().getAvailablePermits(), equalTo(10)); clientRule.getPoolConfig().getPoolLimitDeterminationStrategy().getAvailablePermits(), equalTo(10));
} }
private void sleep(long i) { private void sleep(long i) {
@ -180,47 +135,29 @@ public class PoolingWithRealChannelTest {
private void makeRequests(int number_of_iterations, int numberOfRequests) { private void makeRequests(int number_of_iterations, int numberOfRequests) {
for (int j = 0; j < number_of_iterations; j++) { for (int j = 0; j < number_of_iterations; j++) {
//List<Observable<String>> results = new ArrayList<>();
sleep(100); sleep(100);
List<Observable<String>> results = new ArrayList<>(); List<Observable<String>> results = new ArrayList<>();
//Just giving the client some time to recover //Just giving the client some time to recover
sleep(100); sleep(100);
for (int i = 0; i < numberOfRequests; i++) { for (int i = 0; i < numberOfRequests; i++) {
results.add( results.add(
fromCallable(new Func0<PooledConnection<ByteBuf, ByteBuf>>() { fromCallable((Func0<PooledConnection<ByteBuf, ByteBuf>>) clientRule::connect)
@Override .flatMap((Func1<PooledConnection<ByteBuf, ByteBuf>, Observable<String>>) connection ->
public PooledConnection<ByteBuf, ByteBuf> call() { connection.writeStringAndFlushOnEach(just("Hello"))
return clientRule.connect(); .toCompletable()
} .<ByteBuf>toObservable()
}) .concatWith(connection.getInput())
.flatMap(new Func1<PooledConnection<ByteBuf, ByteBuf>, Observable<String>>() { .take(1)
@Override .single()
public Observable<String> call(PooledConnection<ByteBuf, ByteBuf> connection) { .map((Func1<ByteBuf, String>) byteBuf -> {
return connection.writeStringAndFlushOnEach(just("Hello")) try {
.toCompletable() byte[] bytes = new byte[byteBuf.readableBytes()];
.<ByteBuf>toObservable() byteBuf.readBytes(bytes);
.concatWith(connection.getInput()) return new String(bytes);
.take(1) } finally {
.single() byteBuf.release();
.map(new Func1<ByteBuf, String>() { }
@Override })));
public String call(ByteBuf byteBuf) {
try {
byte[] bytes = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(bytes);
return new String(bytes);
} finally {
byteBuf.release();
}
}
});
}
}));
} }
AssertableSubscriber<String> test = Observable.merge(results).test(); AssertableSubscriber<String> test = Observable.merge(results).test();
test.awaitTerminalEvent(); test.awaitTerminalEvent();

View file

@ -117,7 +117,9 @@ public final class Server implements AutoCloseable {
this.protocolProviders =new ArrayList<>(); this.protocolProviders =new ArrayList<>();
for (ProtocolProvider<HttpChannelInitializer, Transport> provider : ServiceLoader.load(ProtocolProvider.class)) { for (ProtocolProvider<HttpChannelInitializer, Transport> provider : ServiceLoader.load(ProtocolProvider.class)) {
protocolProviders.add(provider); protocolProviders.add(provider);
logger.log(Level.INFO, "protocol provider up: " + provider.transportClass() ); if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "protocol provider up: " + provider.transportClass());
}
} }
this.bootstrap = new ServerBootstrap() this.bootstrap = new ServerBootstrap()
.group(this.parentEventLoopGroup, this.childEventLoopGroup) .group(this.parentEventLoopGroup, this.childEventLoopGroup)

View file

@ -15,14 +15,10 @@ import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors; import java.util.stream.Collectors;
public class HttpEndpointResolver { public class HttpEndpointResolver {
private static final Logger logger = Logger.getLogger(HttpEndpointResolver.class.getName());
private static final int DEFAULT_LIMIT = 1024; private static final int DEFAULT_LIMIT = 1024;
private final List<HttpEndpoint> endpoints; private final List<HttpEndpoint> endpoints;
@ -57,20 +53,12 @@ public class HttpEndpointResolver {
public void handle(List<HttpEndpoint> matchingEndpoints, public void handle(List<HttpEndpoint> matchingEndpoints,
ServerRequest serverRequest, ServerResponse serverResponse) throws IOException { ServerRequest serverRequest, ServerResponse serverResponse) throws IOException {
Objects.requireNonNull(matchingEndpoints); Objects.requireNonNull(matchingEndpoints);
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, () ->
"matching endpoints = " + matchingEndpoints.size() + " --> " + matchingEndpoints);
}
for (HttpEndpoint endpoint : matchingEndpoints) { for (HttpEndpoint endpoint : matchingEndpoints) {
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, () -> "executing endpoint = " + endpoint);
}
endpoint.resolveUriTemplate(serverRequest); endpoint.resolveUriTemplate(serverRequest);
endpoint.before(serverRequest, serverResponse); endpoint.before(serverRequest, serverResponse);
endpointDispatcher.dispatch(endpoint, serverRequest, serverResponse); endpointDispatcher.dispatch(endpoint, serverRequest, serverResponse);
endpoint.after(serverRequest, serverResponse); endpoint.after(serverRequest, serverResponse);
if (serverResponse.getStatus() != null) { if (serverResponse.getStatus() != null) {
logger.log(Level.FINEST, () -> "endpoint " + endpoint + " break, status = " + serverResponse.getStatus());
break; break;
} }
} }
@ -122,14 +110,8 @@ public class HttpEndpointResolver {
HttpEndpoint prefixedEndpoint = HttpEndpoint.builder(endpoint) HttpEndpoint prefixedEndpoint = HttpEndpoint.builder(endpoint)
.setPrefix(prefix + endpoint.getPrefix()) .setPrefix(prefix + endpoint.getPrefix())
.build(); .build();
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, () -> "prefix " + prefix + ": adding endpoint = " + prefixedEndpoint);
}
endpoints.add(prefixedEndpoint); endpoints.add(prefixedEndpoint);
} else { } else {
if (logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, () -> "adding endpoint = " + endpoint);
}
endpoints.add(endpoint); endpoints.add(endpoint);
} }
return this; return this;

View file

@ -9,6 +9,7 @@ import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpUtil; import io.netty.handler.codec.http.HttpUtil;
import org.xbib.net.Pair; import org.xbib.net.Pair;
import org.xbib.net.PercentDecoder;
import org.xbib.net.QueryParameters; import org.xbib.net.QueryParameters;
import org.xbib.net.URL; import org.xbib.net.URL;
import org.xbib.netty.http.common.HttpParameters; import org.xbib.netty.http.common.HttpParameters;
@ -45,7 +46,7 @@ public class HttpServerRequest implements ServerRequest {
private String contextPath; private String contextPath;
private Map<String, String> pathParameters = new LinkedHashMap<>(); private Map<String, String> pathParameters;
private HttpParameters parameters; private HttpParameters parameters;
@ -63,6 +64,7 @@ public class HttpServerRequest implements ServerRequest {
ChannelHandlerContext ctx) { ChannelHandlerContext ctx) {
this.httpRequest = fullHttpRequest.retainedDuplicate(); this.httpRequest = fullHttpRequest.retainedDuplicate();
this.ctx = ctx; this.ctx = ctx;
this.pathParameters = new LinkedHashMap<>();
} }
void handleParameters() { void handleParameters() {
@ -95,13 +97,21 @@ public class HttpServerRequest implements ServerRequest {
logger.log(Level.FINER, "html form, charset = " + htmlCharset + " param body = " + params); logger.log(Level.FINER, "html form, charset = " + htmlCharset + " param body = " + params);
} }
queryParameters.addPercentEncodedBody(params); queryParameters.addPercentEncodedBody(params);
queryParameters.add("_raw", params);
} }
} }
} }
HttpParameters httpParameters = new HttpParameters(); // copy to HTTP parameters but percent-decoded (looks very clumsy)
PercentDecoder percentDecoder = new PercentDecoder(charset.newDecoder()
.onMalformedInput(CodingErrorAction.REPLACE)
.onUnmappableCharacter(CodingErrorAction.REPLACE));
HttpParameters httpParameters = new HttpParameters(mimeType, charset);
for (Pair<String, String> pair : queryParameters) { for (Pair<String, String> pair : queryParameters) {
httpParameters.add(pair.getFirst(), pair.getSecond()); try {
httpParameters.addRaw(percentDecoder.decode(pair.getFirst()), percentDecoder.decode(pair.getSecond()));
} catch (Exception e) {
// does not happen
throw new IllegalArgumentException(pair.toString());
}
} }
this.parameters = httpParameters; this.parameters = httpParameters;
} }
@ -138,7 +148,7 @@ public class HttpServerRequest implements ServerRequest {
@Override @Override
public void addPathParameter(String key, String value) throws IOException { public void addPathParameter(String key, String value) throws IOException {
pathParameters.put(key, value); pathParameters.put(key, value);
parameters.add(key, value); parameters.addRaw(key, value);
} }
@Override @Override

View file

@ -152,7 +152,6 @@ class PostTest {
Server server = Server.builder(domain) Server server = Server.builder(domain)
.build(); .build();
Client client = Client.builder() Client client = Client.builder()
.enableDebug()
.build(); .build();
try { try {
server.accept(); server.accept();
@ -210,7 +209,6 @@ class PostTest {
Server server = Server.builder(domain) Server server = Server.builder(domain)
.build(); .build();
Client client = Client.builder() Client client = Client.builder()
.enableDebug()
.build(); .build();
try { try {
server.accept(); server.accept();
@ -220,14 +218,15 @@ class PostTest {
success1.set(true); success1.set(true);
} }
}; };
Request postRequest = Request.post().setVersion(HttpVersion.HTTP_1_1) Request postRequest = Request.post()
.setVersion(HttpVersion.HTTP_1_1)
.url(server.getServerConfig().getAddress().base().resolve("/post/test.txt")) .url(server.getServerConfig().getAddress().base().resolve("/post/test.txt"))
.contentType(HttpHeaderValues.TEXT_PLAIN, StandardCharsets.UTF_8) .contentType(HttpHeaderValues.TEXT_PLAIN, StandardCharsets.UTF_8)
// you can not pass form parameters on content type "text/plain"
.addParameter("a", "b") .addParameter("a", "b")
// test 'plus' encoding .addParameter("my param", "my value")
.addFormParameter("my param", "my value") .addParameter("withoutplus", "Hello World")
.addFormParameter("withoutplus", "Hello World") .addParameter("name", "Jörg")
.addFormParameter("name", "Jörg")
.setResponseListener(responseListener) .setResponseListener(responseListener)
.build(); .build();
client.execute(postRequest).get(); client.execute(postRequest).get();
@ -255,7 +254,7 @@ class PostTest {
if ("myÿvalue".equals(parameters.getFirst("my param"))) { if ("myÿvalue".equals(parameters.getFirst("my param"))) {
success1.set(true); success1.set(true);
} }
if ("b%YYc".equals(parameters.getFirst("a"))) { if ("bÿc".equals(parameters.getFirst("a"))) {
success2.set(true); success2.set(true);
} }
ServerResponse.write(resp, HttpResponseStatus.OK); ServerResponse.write(resp, HttpResponseStatus.OK);

View file

@ -248,10 +248,9 @@ class CleartextTest {
try { try {
for (int i = 0; i < loop; i++) { for (int i = 0; i < loop; i++) {
String payload = t + "/" + i; String payload = t + "/" + i;
// note that we do not set url() in the request
Request request = Request.get() Request request = Request.get()
.setVersion("HTTP/2.0") .setVersion("HTTP/2.0")
//.url(server1.getServerConfig().getAddress().base())
.uri("/")
.content(payload, "text/plain") .content(payload, "text/plain")
.setResponseListener(responseListener) .setResponseListener(responseListener)
.build(); .build();