many test fixes, bytebuf leaks, add TLS protocol for client

This commit is contained in:
Jörg Prante 2019-08-25 23:26:13 +02:00
parent 712dd570e7
commit 47a1176048
72 changed files with 1036 additions and 609 deletions

View file

@ -1,7 +1,6 @@
plugins {
id "com.github.spotbugs" version "2.0.0"
id "org.sonarqube" version "2.6.1"
id "io.codearte.nexus-staging" version "0.11.0"
id "io.codearte.nexus-staging" version "0.21.0"
id "org.xbib.gradle.plugin.asciidoctor" version "1.5.6.0.1"
}
@ -10,13 +9,7 @@ apply plugin: "io.codearte.nexus-staging"
subprojects {
apply plugin: 'java'
apply plugin: 'maven'
apply plugin: 'signing'
apply plugin: "com.github.spotbugs"
configurations {
asciidoclet
}
dependencies {
testCompile "org.junit.jupiter:junit-jupiter-api:${project.property('junit.version')}"
@ -24,7 +17,6 @@ subprojects {
testCompile "org.junit.jupiter:junit-jupiter-engine:${project.property('junit.version')}"
testCompile "org.junit.vintage:junit-vintage-engine:${project.property('junit.version')}"
testCompile "junit:junit:${project.property('junit4.version')}"
asciidoclet "org.asciidoctor:asciidoclet:${project.property('asciidoclet.version')}"
}
compileJava {
@ -52,10 +44,10 @@ subprojects {
test {
useJUnitPlatform()
systemProperty 'java.util.logging.config.file', 'src/test/resources/logging.properties'
failFast = false
testLogging {
events 'STARTED', 'PASSED', 'FAILED', 'SKIPPED'
showStandardStreams = false
}
afterSuite { desc, result ->
if (!desc.parent) {
@ -84,6 +76,21 @@ subprojects {
'source-highlighter': 'coderay'
}
spotbugs {
toolVersion = '3.1.12'
sourceSets = [sourceSets.main]
ignoreFailures = true
effort = "max"
reportLevel = "high"
// includeFilter = file("config/findbugs/findbugs-include.xml")
// excludeFilter = file("config/findbugs/findbugs-excludes.xml")
}
tasks.withType(com.github.spotbugs.SpotBugsTask) {
reports.xml.enabled = false
reports.html.enabled = true
}
/*javadoc {
options.docletpath = configurations.asciidoclet.files.asType(List)
options.doclet = "org.xbib.asciidoclet.Asciidoclet"
@ -115,12 +122,73 @@ subprojects {
ext {
user = 'jprante'
name = 'netty-http'
description = 'HTTP client and server for Netty'
scmUrl = 'https://github.com/' + user + '/' + name
scmConnection = 'scm:git:git://github.com/' + user + '/' + name + '.git'
scmDeveloperConnection = 'scm:git:git://github.com/' + user + '/' + name + '.git'
projectDescription = 'HTTP client and server for Netty'
scmUrl = 'https://github.com/jprante/netty-http'
scmConnection = 'scm:git:git://github.com/jprante/netty-http.git'
scmDeveloperConnection = 'scm:git:git://github.com/jprante/netty-http.git'
inceptionDate = '2012'
organizationName = 'xbib'
organizationUrl = 'http://xbib.org'
licenseName = 'The Apache License, Version 2.0'
licenseUrl = 'http://www.apache.org/licenses/LICENSE-2.0.txt'
}
/*publishing {
publications {
mavenJava(MavenPublication) {
from components.java
groupId project.group
artifactId project.name
version project.version
artifact sourcesJar
artifact javadocJar
pom {
name = project.name
description = projectDescription
inceptionYear = inceptionDate
url = scmUrl
organization {
name = organizationName
url = organizationUrl
}
scm {
url = scmUrl
connection = scmConnection
developerConnection = scmDeveloperConnection
}
licenses {
license {
name = licenseName
url = licenseUrl
}
}
developers {
developer {
id = user
name = 'Jörg Prante'
email = 'joergprante@gmail.com'
url = 'https://github.com/jprante'
}
}
}
}
}
repositories {
maven {
url "https://oss.sonatype.org/service/local/staging/deploy/maven2"
credentials {
username ossrhUsername
password ossrhPassword
}
}
}
}
signing {
sign publishing.publications.mavenJava
}*/
task sonaTypeUpload(type: Upload) {
group = 'publish'
configuration = configurations.archives
@ -142,11 +210,11 @@ subprojects {
name project.name
description description
packaging 'jar'
inceptionYear '2012'
inceptionYear inceptionDate
url scmUrl
organization {
name 'xbib'
url 'http://xbib.org'
name organizationName
url organizationUrl
}
developers {
developer {
@ -163,8 +231,8 @@ subprojects {
}
licenses {
license {
name 'The Apache License, Version 2.0'
url 'http://www.apache.org/licenses/LICENSE-2.0.txt'
name licenseName
url licenseUrl
}
}
}
@ -172,34 +240,7 @@ subprojects {
}
}
}
spotbugs {
toolVersion = '3.1.12'
sourceSets = [sourceSets.main]
ignoreFailures = true
effort = "max"
reportLevel = "high"
// includeFilter = file("config/findbugs/findbugs-include.xml")
// excludeFilter = file("config/findbugs/findbugs-excludes.xml")
}
// To generate an HTML report instead of XML
tasks.withType(com.github.spotbugs.SpotBugsTask) {
reports.xml.enabled = false
reports.html.enabled = true
nexusStaging {
packageGroup = "org.xbib"
}
}
sonarqube {
properties {
property "sonar.projectName", "${project.group} ${project.name}"
property "sonar.sourceEncoding", "UTF-8"
property "sonar.tests", "src/test/java"
property "sonar.scm.provider", "git"
property "sonar.junit.reportsPath", "build/test-results/test/"
}
}
nexusStaging {
packageGroup = "org.xbib"
}

View file

@ -1,30 +1,28 @@
group = org.xbib
name = netty-http
version = 4.1.38.3
version = 4.1.39.0
# main packages
netty.version = 4.1.38.Final
# netty
netty.version = 4.1.39.Final
tcnative.version = 2.0.25.Final
# common
# for netty-http-common
xbib-net-url.version = 2.0.0
# server
bouncycastle.version = 1.61
# for netty-http-server
bouncycastle.version = 1.62
# reactive
# for netty-http-server-reactive
reactivestreams.version = 1.0.2
# rest
# for netty-http-server-rest
xbib-guice.version = 4.0.4
# test
junit.version = 5.5.1
junit4.version = 4.12
conscrypt.version = 2.0.0
conscrypt.version = 2.2.1
jackson.version = 2.9.9
# doc
asciidoclet.version = 1.5.4
org.gradle.warning.mode = all

View file

@ -1,5 +1,5 @@
#Tue Aug 06 15:30:36 CEST 2019
distributionUrl=https\://services.gradle.org/distributions/gradle-5.3.1-all.zip
#Sun Aug 18 22:06:23 CEST 2019
distributionUrl=https\://services.gradle.org/distributions/gradle-5.6-all.zip
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStorePath=wrapper/dists

6
gradlew vendored
View file

@ -7,7 +7,7 @@
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
@ -125,8 +125,8 @@ if $darwin; then
GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\""
fi
# For Cygwin, switch paths to Windows format before running java
if $cygwin ; then
# For Cygwin or MSYS, switch paths to Windows format before running java
if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then
APP_HOME=`cygpath --path --mixed "$APP_HOME"`
CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
JAVACMD=`cygpath --unix "$JAVACMD"`

2
gradlew.bat vendored
View file

@ -5,7 +5,7 @@
@rem you may not use this file except in compliance with the License.
@rem You may obtain a copy of the License at
@rem
@rem http://www.apache.org/licenses/LICENSE-2.0
@rem https://www.apache.org/licenses/LICENSE-2.0
@rem
@rem Unless required by applicable law or agreed to in writing, software
@rem distributed under the License is distributed on an "AS IS" BASIS,

View file

@ -1,6 +1,7 @@
package org.xbib.netty.http.client.rest;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.http.HttpMethod;
import org.xbib.net.URL;
import org.xbib.netty.http.client.Client;
@ -11,6 +12,7 @@ import org.xbib.netty.http.common.HttpResponse;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
public class RestClient {
@ -18,11 +20,14 @@ public class RestClient {
private HttpResponse response;
private ByteBuf byteBuf;
private RestClient() {
}
public void setResponse(HttpResponse response) {
this.response = response;
this.byteBuf = response != null ? response.getBody().retain() : null;
}
public HttpResponse getResponse() {
@ -34,7 +39,6 @@ public class RestClient {
}
public String asString(Charset charset) {
ByteBuf byteBuf = response != null ? response.getBody() : null;
return byteBuf != null && byteBuf.isReadable() ? byteBuf.toString(charset) : null;
}
@ -43,11 +47,11 @@ public class RestClient {
}
public static RestClient get(String urlString) throws IOException {
return method(urlString, null, null, HttpMethod.GET);
return method(urlString, HttpMethod.GET);
}
public static RestClient delete(String urlString) throws IOException {
return method(urlString, null, null, HttpMethod.DELETE);
return method(urlString, HttpMethod.DELETE);
}
public static RestClient post(String urlString, String body) throws IOException {
@ -66,28 +70,31 @@ public class RestClient {
return method(urlString, content, HttpMethod.PUT);
}
public static RestClient method(String urlString,
HttpMethod httpMethod) throws IOException {
return method(urlString, Unpooled.buffer(), httpMethod);
}
public static RestClient method(String urlString,
String body, Charset charset,
HttpMethod httpMethod) throws IOException {
ByteBuf byteBuf = null;
if (body != null && charset != null) {
byteBuf = client.getByteBufAllocator().buffer();
byteBuf.writeCharSequence(body, charset);
}
Objects.requireNonNull(body);
Objects.requireNonNull(charset);
ByteBuf byteBuf = client.getByteBufAllocator().buffer();
byteBuf.writeCharSequence(body, charset);
return method(urlString, byteBuf, httpMethod);
}
public static RestClient method(String urlString,
ByteBuf byteBuf,
HttpMethod httpMethod) throws IOException {
Objects.requireNonNull(byteBuf);
URL url = URL.create(urlString);
RestClient restClient = new RestClient();
Request.Builder requestBuilder = Request.builder(httpMethod).url(url);
if (byteBuf != null) {
requestBuilder.content(byteBuf);
}
requestBuilder.content(byteBuf);
client.newTransport(HttpAddress.http1(url))
.execute(requestBuilder.build().setResponseListener(restClient::setResponse)).get();
.execute(requestBuilder.build().setResponseListener(restClient::setResponse)).close();
return restClient;
}
}

View file

@ -0,0 +1,32 @@
package org.xbib.netty.http.client.rest;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import java.util.logging.ConsoleHandler;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.LogManager;
import java.util.logging.Logger;
import java.util.logging.SimpleFormatter;
public class NettyHttpTestExtension implements BeforeAllCallback {
@Override
public void beforeAll(ExtensionContext context) {
System.setProperty("io.netty.noUnsafe", Boolean.toString(true));
System.setProperty("io.netty.leakDetection.level", "ADVANCED");
Level level = Level.INFO;
System.setProperty("java.util.logging.SimpleFormatter.format",
"%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS.%1$tL %4$-7s [%3$s] %5$s %6$s%n");
LogManager.getLogManager().reset();
Logger rootLogger = LogManager.getLogManager().getLogger("");
Handler handler = new ConsoleHandler();
handler.setFormatter(new SimpleFormatter());
rootLogger.addHandler(handler);
rootLogger.setLevel(level);
for (Handler h : rootLogger.getHandlers()) {
handler.setFormatter(new SimpleFormatter());
h.setLevel(level);
}
}
}

View file

@ -1,10 +1,12 @@
package org.xbib.netty.http.client.rest;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import java.io.IOException;
import java.util.logging.Logger;
@ExtendWith(NettyHttpTestExtension.class)
class RestClientTest {
private static final Logger logger = Logger.getLogger(RestClientTest.class.getName());

View file

@ -57,11 +57,12 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
public final class Client {
public final class Client implements AutoCloseable {
private static final Logger logger = Logger.getLogger(Client.class.getName());
@ -79,6 +80,9 @@ public final class Client {
System.setProperty("io.netty.noKeySetOptimization", Boolean.toString(true));
}
}
private static final AtomicLong requestCounter = new AtomicLong();
private static final AtomicLong responseCounter = new AtomicLong();
private final ClientConfig clientConfig;
@ -140,7 +144,7 @@ public final class Client {
ClientChannelPoolHandler clientChannelPoolHandler = new ClientChannelPoolHandler();
this.pool = new BoundedChannelPool<>(semaphore, clientConfig.getPoolVersion(),
nodes, bootstrap, clientChannelPoolHandler, retries,
BoundedChannelPool.PoolKeySelectorType.ROUNDROBIN);
clientConfig.getPoolKeySelectorType());
Integer nodeConnectionLimit = clientConfig.getPoolNodeConnectionLimit();
if (nodeConnectionLimit == null || nodeConnectionLimit == 0) {
nodeConnectionLimit = nodes.size();
@ -150,6 +154,7 @@ public final class Client {
} catch (Exception e) {
logger.log(Level.SEVERE, e.getMessage(), e);
}
logger.log(Level.FINE, "client pool prepared: size = " + nodeConnectionLimit);
}
}
@ -182,6 +187,14 @@ public final class Client {
logger.log(level, NetworkUtils::displayNetworkInterfaces);
}
public AtomicLong getRequestCounter() {
return requestCounter;
}
public AtomicLong getResponseCounter() {
return responseCounter;
}
public Transport newTransport() {
return newTransport(null);
}
@ -293,8 +306,21 @@ public final class Client {
close(transport);
}
@Override
public void close() {
try {
shutdownGracefully();
} catch (IOException e) {
logger.log(Level.SEVERE, e.getMessage(), e);
}
}
public void shutdownGracefully() throws IOException {
logger.log(Level.FINE, "shutting down gracefully");
shutdownGracefully(30L, TimeUnit.SECONDS);
}
public void shutdownGracefully(long amount, TimeUnit timeUnit) throws IOException {
logger.log(Level.FINE, "shutting down");
for (Transport transport : transports) {
close(transport);
}
@ -302,12 +328,11 @@ public final class Client {
if (hasPooledConnections()) {
pool.close();
}
logger.log(Level.FINE, "shutting down");
eventLoopGroup.shutdownGracefully();
eventLoopGroup.shutdownGracefully(1L, amount, timeUnit);
try {
eventLoopGroup.awaitTermination(10L, TimeUnit.SECONDS);
eventLoopGroup.awaitTermination(amount, timeUnit);
} catch (InterruptedException e) {
// ignore
throw new IOException(e);
}
}
@ -359,14 +384,17 @@ public final class Client {
default:
break;
}
engine.setEnabledProtocols(clientConfig.getProtocols());
return sslHandler;
}
private static SslContext newSslContext(ClientConfig clientConfig, HttpVersion httpVersion) throws SSLException {
// Conscrypt?
SslContextBuilder sslContextBuilder = SslContextBuilder.forClient()
.sslProvider(clientConfig.getSslProvider())
.ciphers(clientConfig.getCiphers(), clientConfig.getCipherSuiteFilter())
.applicationProtocolConfig(newApplicationProtocolConfig(httpVersion));
if (clientConfig.getSslContextProvider() != null) {
sslContextBuilder.sslContextProvider(clientConfig.getSslContextProvider());
}
@ -415,12 +443,14 @@ public final class Client {
HttpAddress httpAddress = channel.attr(pool.getAttributeKey()).get();
HttpVersion httpVersion = httpAddress.getVersion();
SslContext sslContext = newSslContext(clientConfig, httpAddress.getVersion());
SslHandlerFactory sslHandlerFactory = new SslHandlerFactory(sslContext, clientConfig, httpAddress, byteBufAllocator);
SslHandlerFactory sslHandlerFactory = new SslHandlerFactory(sslContext,
clientConfig, httpAddress, byteBufAllocator);
Http2ChannelInitializer http2ChannelInitializer =
new Http2ChannelInitializer(clientConfig, httpAddress, sslHandlerFactory);
if (httpVersion.majorVersion() == 1) {
HttpChannelInitializer initializer =
new HttpChannelInitializer(clientConfig, httpAddress, sslHandlerFactory, http2ChannelInitializer);
new HttpChannelInitializer(clientConfig, httpAddress,
sslHandlerFactory, http2ChannelInitializer);
initializer.initChannel(channel);
} else {
http2ChannelInitializer.initChannel(channel);
@ -428,7 +458,7 @@ public final class Client {
}
}
public class SslHandlerFactory {
public static class SslHandlerFactory {
private final SslContext sslContext;
@ -438,7 +468,8 @@ public final class Client {
private final ByteBufAllocator allocator;
SslHandlerFactory(SslContext sslContext, ClientConfig clientConfig, HttpAddress httpAddress, ByteBufAllocator allocator) {
SslHandlerFactory(SslContext sslContext, ClientConfig clientConfig,
HttpAddress httpAddress, ByteBufAllocator allocator) {
this.sslContext = sslContext;
this.clientConfig = clientConfig;
this.httpAddress = httpAddress;
@ -559,7 +590,7 @@ public final class Client {
return this;
}
public Builder setEnableGzip(boolean enableGzip) {
public Builder enableGzip(boolean enableGzip) {
clientConfig.setEnableGzip(enableGzip);
return this;
}
@ -586,6 +617,11 @@ public final class Client {
return this;
}
public Builder setTlsProtocols(String[] protocols) {
clientConfig.setProtocols(protocols);
return this;
}
public Builder setCiphers(Iterable<String> ciphers) {
clientConfig.setCiphers(ciphers);
return this;

View file

@ -8,6 +8,8 @@ import io.netty.handler.logging.LogLevel;
import io.netty.handler.proxy.HttpProxyHandler;
import io.netty.handler.ssl.CipherSuiteFilter;
import io.netty.handler.ssl.SslProvider;
import org.xbib.netty.http.client.pool.BoundedChannelPool;
import org.xbib.netty.http.client.pool.Pool;
import org.xbib.netty.http.client.retry.BackOff;
import org.xbib.netty.http.common.HttpAddress;
import org.xbib.netty.http.common.security.SecurityUtil;
@ -18,6 +20,7 @@ import java.security.KeyStore;
import java.security.Provider;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
public class ClientConfig {
@ -118,6 +121,11 @@ public class ClientConfig {
*/
Provider SSL_CONTEXT_PROVIDER = null;
/**
* Transport layer security protocol versions.
*/
String[] PROTOCOLS = new String[] { "TLSv1.3", "TLSv1.2" };
/**
* Default ciphers. We care about HTTP/2.
*/
@ -143,6 +151,8 @@ public class ClientConfig {
*/
HttpVersion POOL_VERSION = HttpVersion.HTTP_1_1;
Pool.PoolKeySelectorType POOL_KEY_SELECTOR_TYPE = Pool.PoolKeySelectorType.ROUNDROBIN;
/**
* Default connection pool security.
*/
@ -204,6 +214,8 @@ public class ClientConfig {
private Provider sslContextProvider = Defaults.SSL_CONTEXT_PROVIDER;
private String[] protocols = Defaults.PROTOCOLS;
private Iterable<String> ciphers = Defaults.CIPHERS;
private CipherSuiteFilter cipherSuiteFilter = Defaults.CIPHER_SUITE_FILTER;
@ -224,6 +236,8 @@ public class ClientConfig {
private List<HttpAddress> poolNodes = new ArrayList<>();
private Pool.PoolKeySelectorType poolKeySelectorType = Defaults.POOL_KEY_SELECTOR_TYPE;
private Integer poolNodeConnectionLimit;
private Integer retriesPerPoolNode = Defaults.RETRIES_PER_NODE;
@ -465,6 +479,15 @@ public class ClientConfig {
return sslContextProvider;
}
public ClientConfig setProtocols(String[] protocols) {
this.protocols = protocols;
return this;
}
public String[] getProtocols() {
return protocols;
}
public ClientConfig setCiphers(Iterable<String> ciphers) {
this.ciphers = ciphers;
return this;
@ -536,6 +559,15 @@ public class ClientConfig {
return poolNodes;
}
public ClientConfig setPoolKeySelectorType(Pool.PoolKeySelectorType poolKeySelectorType) {
this.poolKeySelectorType = poolKeySelectorType;
return this;
}
public Pool.PoolKeySelectorType getPoolKeySelectorType() {
return poolKeySelectorType;
}
public ClientConfig addPoolNode(HttpAddress poolNodeAddress) {
this.poolNodes.add(poolNodeAddress);
return this;

View file

@ -309,6 +309,8 @@ public class Request {
private URL url;
private String uri;
private HttpParameters uriParameters;
private HttpParameters formParameters;
@ -327,21 +329,21 @@ public class Request {
Builder(ByteBufAllocator allocator) {
this.allocator = allocator;
httpMethod = DEFAULT_METHOD;
httpVersion = DEFAULT_HTTP_VERSION;
userAgent = DEFAULT_USER_AGENT;
gzip = DEFAULT_GZIP;
keepalive = DEFAULT_KEEPALIVE;
url = DEFAULT_URL;
timeoutInMillis = DEFAULT_TIMEOUT_MILLIS;
followRedirect = DEFAULT_FOLLOW_REDIRECT;
maxRedirects = DEFAULT_MAX_REDIRECT;
headers = new DefaultHttpHeaders();
removeHeaders = new ArrayList<>();
cookies = new HashSet<>();
encoder = PercentEncoders.getQueryEncoder(StandardCharsets.UTF_8);
uriParameters = new HttpParameters();
formParameters = new HttpParameters(DEFAULT_FORM_CONTENT_TYPE);
this.httpMethod = DEFAULT_METHOD;
this.httpVersion = DEFAULT_HTTP_VERSION;
this.userAgent = DEFAULT_USER_AGENT;
this.gzip = DEFAULT_GZIP;
this.keepalive = DEFAULT_KEEPALIVE;
this.url = DEFAULT_URL;
this.timeoutInMillis = DEFAULT_TIMEOUT_MILLIS;
this.followRedirect = DEFAULT_FOLLOW_REDIRECT;
this.maxRedirects = DEFAULT_MAX_REDIRECT;
this.headers = new DefaultHttpHeaders();
this.removeHeaders = new ArrayList<>();
this.cookies = new HashSet<>();
this.encoder = PercentEncoders.getQueryEncoder(StandardCharsets.UTF_8);
this.uriParameters = new HttpParameters();
this.formParameters = new HttpParameters(DEFAULT_FORM_CONTENT_TYPE);
}
public Builder setMethod(HttpMethod httpMethod) {
@ -394,7 +396,7 @@ public class Request {
}
public Builder uri(String uri) {
this.url = url.resolve(uri);
this.uri = uri;
return this;
}
@ -482,12 +484,22 @@ public class Request {
}
public Builder text(String text) {
content(ByteBufUtil.writeUtf8(allocator, text), HttpHeaderValues.TEXT_PLAIN);
ByteBuf byteBuf = ByteBufUtil.writeUtf8(allocator, text);
try {
content(byteBuf, HttpHeaderValues.TEXT_PLAIN);
} finally {
byteBuf.release();
}
return this;
}
public Builder json(String json) {
content(ByteBufUtil.writeUtf8(allocator, json), HttpHeaderValues.APPLICATION_JSON);
ByteBuf byteBuf = ByteBufUtil.writeUtf8(allocator, json);
try {
content(byteBuf, HttpHeaderValues.APPLICATION_JSON);
} finally {
byteBuf.release();
}
return this;
}
@ -518,8 +530,46 @@ public class Request {
}
public Request build() {
if (url == null) {
throw new IllegalStateException("URL not set");
DefaultHttpHeaders validatedHeaders = new DefaultHttpHeaders(true);
if (url != null) {
// attach user query parameters to URL
URL.Builder mutator = url.mutator();
uriParameters.forEach((k, v) -> v.forEach(value -> mutator.queryParam(k, value)));
url = mutator.build();
// let Netty's query string decoder/encoder work over the URL to add parameters given implicitly in url()
String path = url.getPath();
String query = url.getQuery();
QueryStringDecoder queryStringDecoder = new QueryStringDecoder(query != null ? path + "?" + query : path, StandardCharsets.UTF_8);
QueryStringEncoder queryStringEncoder = new QueryStringEncoder(queryStringDecoder.path());
for (Map.Entry<String, List<String>> entry : queryStringDecoder.parameters().entrySet()) {
for (String value : entry.getValue()) {
queryStringEncoder.addParam(entry.getKey(), value);
}
}
// build uri from QueryStringDecoder
String pathAndQuery = queryStringEncoder.toString();
StringBuilder sb = new StringBuilder();
if (!pathAndQuery.isEmpty()) {
sb.append(pathAndQuery);
}
String fragment = url.getFragment();
if (fragment != null && !fragment.isEmpty()) {
sb.append('#').append(fragment);
}
this.uri = sb.toString(); // the encoded form of path/query/fragment
validatedHeaders.set(headers);
String scheme = url.getScheme();
if (httpVersion.majorVersion() == 2) {
validatedHeaders.set(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), scheme);
}
validatedHeaders.set(HttpHeaderNames.HOST, url.getHostInfo());
}
validatedHeaders.set(HttpHeaderNames.DATE, DateTimeFormatter.RFC_1123_DATE_TIME.format(ZonedDateTime.now(ZoneOffset.UTC)));
if (userAgent != null) {
validatedHeaders.set(HttpHeaderNames.USER_AGENT, userAgent);
}
if (gzip) {
validatedHeaders.set(HttpHeaderNames.ACCEPT_ENCODING, "gzip");
}
// form parameters
if (!formParameters.isEmpty()) {
@ -530,47 +580,7 @@ public class Request {
throw new IllegalArgumentException();
}
}
// attach user query parameters to URL
URL.Builder mutator = url.mutator();
uriParameters.forEach((k, v) -> v.forEach(value -> mutator.queryParam(k, value)));
url = mutator.build();
Objects.requireNonNull(url.getHost());
// let Netty's query string decoder/encoder work over the URL to add parameters given implicitly in url()
String path = url.getPath();
String query = url.getQuery();
QueryStringDecoder queryStringDecoder = new QueryStringDecoder(query != null ? path + "?" + query : path, StandardCharsets.UTF_8);
QueryStringEncoder queryStringEncoder = new QueryStringEncoder(queryStringDecoder.path());
for (Map.Entry<String, List<String>> entry : queryStringDecoder.parameters().entrySet()) {
for (String value : entry.getValue()) {
queryStringEncoder.addParam(entry.getKey(), value);
}
}
// build uri from QueryStringDecoder
String pathAndQuery = queryStringEncoder.toString();
StringBuilder sb = new StringBuilder();
if (!pathAndQuery.isEmpty()) {
sb.append(pathAndQuery);
}
String fragment = url.getFragment();
if (fragment != null && !fragment.isEmpty()) {
sb.append('#').append(fragment);
}
String uri = sb.toString(); // the encoded form of path/query/fragment
DefaultHttpHeaders validatedHeaders = new DefaultHttpHeaders(true);
validatedHeaders.set(headers);
String scheme = url.getScheme();
if (httpVersion.majorVersion() == 2) {
validatedHeaders.set(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), scheme);
}
validatedHeaders.set(HttpHeaderNames.HOST, url.getHostInfo());
validatedHeaders.set(HttpHeaderNames.DATE, DateTimeFormatter.RFC_1123_DATE_TIME.format(ZonedDateTime.now(ZoneOffset.UTC)));
if (userAgent != null) {
validatedHeaders.set(HttpHeaderNames.USER_AGENT, userAgent);
}
if (gzip) {
validatedHeaders.set(HttpHeaderNames.ACCEPT_ENCODING, "gzip");
}
int length = content != null ? content.capacity() : 0;
int length = content != null ? content.readableBytes() : 0;
if (!validatedHeaders.contains(HttpHeaderNames.CONTENT_LENGTH) && !validatedHeaders.contains(HttpHeaderNames.TRANSFER_ENCODING)) {
if (length < 0) {
validatedHeaders.set(HttpHeaderNames.TRANSFER_ENCODING, "chunked");

View file

@ -5,16 +5,14 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpResponse;
import org.xbib.netty.http.client.transport.Transport;
import org.xbib.netty.http.common.DefaultHttpResponse;
@ChannelHandler.Sharable
public class HttpResponseHandler extends SimpleChannelInboundHandler<FullHttpResponse> {
@Override
public void channelRead0(ChannelHandlerContext ctx, FullHttpResponse httpResponse) throws Exception {
public void channelRead0(ChannelHandlerContext ctx, FullHttpResponse fullHttpResponse) throws Exception {
Transport transport = ctx.channel().attr(Transport.TRANSPORT_ATTRIBUTE_KEY).get();
transport.responseReceived(ctx.channel(),null,
new DefaultHttpResponse(transport.getHttpAddress(), httpResponse.retain()));
transport.responseReceived(ctx.channel(), null, fullHttpResponse);
}
@Override

View file

@ -5,6 +5,9 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.codec.http2.DefaultHttp2SettingsFrame;
import io.netty.handler.codec.http2.Http2ConnectionPrefaceAndSettingsFrameWrittenEvent;
import io.netty.handler.codec.http2.Http2FrameLogger;
@ -74,6 +77,8 @@ public class Http2ChannelInitializer extends ChannelInitializer<Channel> {
Http2MultiplexCodec multiplexCodec = multiplexCodecBuilder.autoAckSettingsFrame(true) .build();
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("client-multiplex", multiplexCodec);
// does not work
//pipeline.addLast("client-decompressor", new HttpContentDecompressor());
pipeline.addLast("client-messages", new ClientMessages());
}

View file

@ -6,7 +6,6 @@ import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http2.HttpConversionUtil;
import org.xbib.netty.http.client.transport.Transport;
import org.xbib.netty.http.common.DefaultHttpResponse;
@ChannelHandler.Sharable
public class Http2ResponseHandler extends SimpleChannelInboundHandler<FullHttpResponse> {
@ -15,8 +14,7 @@ public class Http2ResponseHandler extends SimpleChannelInboundHandler<FullHttpRe
protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse httpResponse) throws Exception {
Transport transport = ctx.channel().attr(Transport.TRANSPORT_ATTRIBUTE_KEY).get();
Integer streamId = httpResponse.headers().getInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text());
transport.responseReceived(ctx.channel(), streamId,
new DefaultHttpResponse(transport.getHttpAddress(), httpResponse.retain()));
transport.responseReceived(ctx.channel(), streamId, httpResponse);
}
@Override

View file

@ -111,6 +111,7 @@ public class BoundedChannelPool<K extends PoolKey> implements Pool<Channel> {
counts.put(node, 0);
failedCounts.put(node, 0);
}
logger.log(Level.FINE, "pool is up");
}
public HttpVersion getVersion() {
@ -174,7 +175,7 @@ public class BoundedChannelPool<K extends PoolKey> implements Pool<Channel> {
channelQueue.add(channel);
}
} else if (channel.isOpen() && close) {
logger.log(Level.FINE, "trying to close channel " + channel);
logger.log(Level.FINE, "closing channel " + channel);
channel.close();
}
if (channelPoolhandler != null) {
@ -211,7 +212,7 @@ public class BoundedChannelPool<K extends PoolKey> implements Pool<Channel> {
channelPromise.get();
logger.log(Level.FINE, "goaway frame sent to " + channel);
} catch (ExecutionException e) {
// ignore error if goaway can not be sent
logger.log(Level.FINE, e.getMessage(), e);
} catch (InterruptedException e) {
throw new IOException(e);
}
@ -235,9 +236,8 @@ public class BoundedChannelPool<K extends PoolKey> implements Pool<Channel> {
K key = null;
Integer min = Integer.MAX_VALUE;
Integer next;
//int r = ThreadLocalRandom.current().nextInt(numberOfNodes);
for (int j = 0; j < numberOfNodes; j++) {
K nextKey = poolKeySelector.key(); //nodes.get(j % numberOfNodes);
K nextKey = poolKeySelector.key();
next = counts.get(nextKey);
if (next == null || next == 0) {
key = nextKey;
@ -303,9 +303,9 @@ public class BoundedChannelPool<K extends PoolKey> implements Pool<Channel> {
private Channel poll() {
Queue<Channel> channelQueue;
Channel channel;
//int r = ThreadLocalRandom.current().nextInt(numberOfNodes);
for (int j = 0; j < numberOfNodes; j++) {
K key = poolKeySelector.key(); //nodes.get(j % numberOfNodes);
K key = poolKeySelector.key();
logger.log(Level.FINE, "poll: key = " + key);
channelQueue = availableChannels.get(key);
if (channelQueue != null) {
channel = channelQueue.poll();
@ -319,10 +319,6 @@ public class BoundedChannelPool<K extends PoolKey> implements Pool<Channel> {
return null;
}
public enum PoolKeySelectorType {
RANDOM, ROUNDROBIN
}
private interface PoolKeySelector<K extends PoolKey> {
K key();
}

View file

@ -9,4 +9,8 @@ public interface Pool<T> extends Closeable {
T acquire() throws Exception;
void release(T t, boolean close) throws Exception;
enum PoolKeySelectorType {
RANDOM, ROUNDROBIN
}
}

View file

@ -78,8 +78,8 @@ abstract class BaseTransport implements Transport {
* @return completable future
*/
@Override
public <T> CompletableFuture<T> execute(Request request,
Function<HttpResponse, T> supplier) throws IOException {
public <T> CompletableFuture<T> execute(Request request, Function<HttpResponse, T> supplier)
throws IOException {
Objects.requireNonNull(supplier);
final CompletableFuture<T> completableFuture = new CompletableFuture<>();
request.setResponseListener(response -> {
@ -94,8 +94,10 @@ abstract class BaseTransport implements Transport {
}
@Override
public synchronized void close() {
get();
public void close() {
if (!channels.isEmpty()) {
get();
}
}
@Override
@ -133,25 +135,30 @@ abstract class BaseTransport implements Transport {
@Override
public Transport get(long value, TimeUnit timeUnit) {
if (channels.isEmpty()) {
return this;
}
for (Map.Entry<String, Flow> entry : channelFlowMap.entrySet()) {
Flow flow = entry.getValue();
for (Integer key : flow.keys()) {
try {
flow.get(key).get(value, timeUnit);
} catch (Exception e) {
String requestKey = getRequestKey(entry.getKey(), key);
if (requestKey != null) {
Request request = requests.get(requestKey);
if (request != null && request.getCompletableFuture() != null) {
request.getCompletableFuture().completeExceptionally(e);
if (!flow.isClosed()) {
for (Integer key : flow.keys()) {
try {
flow.get(key).get(value, timeUnit);
} catch (Exception e) {
String requestKey = getRequestKey(entry.getKey(), key);
if (requestKey != null) {
Request request = requests.get(requestKey);
if (request != null && request.getCompletableFuture() != null) {
request.getCompletableFuture().completeExceptionally(e);
}
}
flow.fail(e);
} finally {
flow.remove(key);
}
flow.fail(e);
} finally {
flow.remove(key);
}
flow.close();
}
flow.close();
}
channels.values().forEach(channel -> {
try {
@ -160,14 +167,14 @@ abstract class BaseTransport implements Transport {
logger.log(Level.WARNING, e.getMessage(), e);
}
});
channelFlowMap.clear();
channels.clear();
requests.clear();
return this;
}
@Override
public void cancel() {
if (channels.isEmpty()) {
return;
}
for (Map.Entry<String, Flow> entry : channelFlowMap.entrySet()) {
Flow flow = entry.getValue();
for (Integer key : flow.keys()) {
@ -198,6 +205,7 @@ abstract class BaseTransport implements Transport {
requests.clear();
}
@Override
public SSLSession getSession() {
return sslSession;
}

View file

@ -44,7 +44,7 @@ class Flow {
}
Integer nextStreamId() {
Integer streamId = counter.getAndAdd(2);
int streamId = counter.getAndAdd(2);
if (streamId == Integer.MIN_VALUE) {
// reset if overflow, Java wraps atomic integers to Integer.MIN_VALUE
// should we send a GOAWAY?
@ -65,6 +65,10 @@ class Flow {
map.clear();
}
public boolean isClosed() {
return map.isEmpty();
}
@Override
public String toString() {
return "[next=" + counter + ", " + map + "]";

View file

@ -3,6 +3,7 @@ package org.xbib.netty.http.client.transport;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
@ -22,6 +23,7 @@ import org.xbib.netty.http.client.handler.http2.Http2ResponseHandler;
import org.xbib.netty.http.client.handler.http2.Http2StreamFrameToHttpObjectCodec;
import org.xbib.netty.http.client.listener.CookieListener;
import org.xbib.netty.http.client.listener.StatusListener;
import org.xbib.netty.http.common.DefaultHttpResponse;
import org.xbib.netty.http.common.HttpAddress;
import org.xbib.netty.http.client.Request;
import org.xbib.netty.http.client.listener.ResponseListener;
@ -50,7 +52,7 @@ public class Http2Transport extends BaseTransport {
super(client, httpAddress);
this.settingsPromise = httpAddress != null ? new CompletableFuture<>() : null;
final Transport transport = this;
this.initializer = new ChannelInitializer<Channel>() {
this.initializer = new ChannelInitializer<>() {
@Override
protected void initChannel(Channel ch) {
ch.attr(TRANSPORT_ATTRIBUTE_KEY).set(transport);
@ -104,6 +106,7 @@ public class Http2Transport extends BaseTransport {
childChannel.write(dataFrame);
}
childChannel.flush();
client.getRequestCounter().incrementAndGet();
if (client.hasPooledConnections()) {
client.releaseChannel(channel, false);
}
@ -134,29 +137,38 @@ public class Http2Transport extends BaseTransport {
}
@Override
public void responseReceived(Channel channel, Integer streamId, HttpResponse httpResponse) {
public void responseReceived(Channel channel, Integer streamId, FullHttpResponse fullHttpResponse) {
if (throwable != null) {
logger.log(Level.WARNING, "throwable not null for response " + httpResponse, throwable);
logger.log(Level.WARNING, "throwable is not null?", throwable);
return;
}
if (streamId == null) {
logger.log(Level.WARNING, "stream ID is null for response " + httpResponse);
logger.log(Level.WARNING, "stream ID is null?");
return;
}
// format of childchan channel ID is <parent channel ID> "/" <substream ID>
String channelId = channel.id().toString();
int pos = channelId.indexOf('/');
channelId = pos > 0 ? channelId.substring(0, pos) : channelId;
Flow flow = channelFlowMap.get(channelId);
if (flow == null) {
return;
}
String requestKey = getRequestKey(channelId, streamId);
CompletableFuture<Boolean> promise = flow.get(streamId);
if (promise != null) {
Request request = requests.get(requestKey);
DefaultHttpResponse httpResponse = new DefaultHttpResponse(httpAddress, fullHttpResponse);
client.getResponseCounter().incrementAndGet();
try {
// format of childchan channel ID is <parent channel ID> "/" <substream ID>
String channelId = channel.id().toString();
int pos = channelId.indexOf('/');
channelId = pos > 0 ? channelId.substring(0, pos) : channelId;
Flow flow = channelFlowMap.get(channelId);
if (flow == null) {
if (logger.isLoggable(Level.WARNING)) {
logger.log(Level.WARNING, "flow is null? channelId = " + channelId);
}
return;
}
Request request = requests.remove(getRequestKey(channelId, streamId));
if (request == null) {
promise.completeExceptionally(new IllegalStateException());
CompletableFuture<Boolean> promise = flow.get(streamId);
if (promise != null) {
if (logger.isLoggable(Level.WARNING)) {
logger.log(Level.WARNING, "request is null? channelId = " + channelId + " streamId = " + streamId);
}
promise.completeExceptionally(new IllegalStateException("no request"));
}
} else {
StatusListener statusListener = request.getStatusListener();
if (statusListener != null) {
@ -170,11 +182,12 @@ public class Http2Transport extends BaseTransport {
cookieListener.onCookie(cookie);
}
}
ResponseListener<HttpResponse> responseListener = request.getResponseListener();
if (responseListener != null) {
responseListener.onResponse(httpResponse);
}
CompletableFuture<Boolean> promise = flow.get(streamId);
try {
ResponseListener<HttpResponse> responseListener = request.getResponseListener();
if (responseListener != null) {
responseListener.onResponse(httpResponse);
}
Request retryRequest = retry(request, httpResponse);
if (retryRequest != null) {
// retry transport, wait for completion
@ -186,14 +199,25 @@ public class Http2Transport extends BaseTransport {
client.continuation(this, continueRequest);
}
}
promise.complete(true);
if (promise != null) {
promise.complete(true);
} else {
// when transport is closed, flow map will be emptied
logger.log(Level.FINE, "promise is null, flow lost");
}
} catch (URLSyntaxException | IOException e) {
promise.completeExceptionally(e);
if (promise != null) {
promise.completeExceptionally(e);
} else {
logger.log(Level.FINE, "promise is null, can't abort flow");
}
} finally {
flow.remove(streamId);
}
}
} finally {
httpResponse.release();
}
channelFlowMap.get(channelId).remove(streamId);
requests.remove(requestKey);
}
@Override

View file

@ -3,6 +3,7 @@ package org.xbib.netty.http.client.transport;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Settings;
@ -13,6 +14,7 @@ import org.xbib.netty.http.client.cookie.ClientCookieDecoder;
import org.xbib.netty.http.client.cookie.ClientCookieEncoder;
import org.xbib.netty.http.client.listener.CookieListener;
import org.xbib.netty.http.client.listener.StatusListener;
import org.xbib.netty.http.common.DefaultHttpResponse;
import org.xbib.netty.http.common.HttpAddress;
import org.xbib.netty.http.client.Request;
import org.xbib.netty.http.client.listener.ResponseListener;
@ -70,63 +72,70 @@ public class HttpTransport extends BaseTransport {
// flush after putting request into requests map
if (channel.isWritable()) {
channel.writeAndFlush(fullHttpRequest);
client.getRequestCounter().incrementAndGet();
}
return this;
}
@Override
public void responseReceived(Channel channel, Integer streamId, HttpResponse httpResponse) {
public void responseReceived(Channel channel, Integer streamId, FullHttpResponse fullHttpResponse) {
if (throwable != null) {
logger.log(Level.WARNING, "throwable not null for response " + httpResponse, throwable);
logger.log(Level.WARNING, "throwable not null", throwable);
return;
}
if (requests.isEmpty()) {
logger.log(Level.WARNING, "no request present for responding");
return;
}
// streamID is expected to be null, last request on memory is expected to be current, remove request from memory
Request request = requests.remove(requests.lastKey());
if (request != null) {
StatusListener statusListener = request.getStatusListener();
if (statusListener != null) {
statusListener.onStatus(httpResponse.getStatus());
}
for (String cookieString : httpResponse.getHeaders().getAllHeaders(HttpHeaderNames.SET_COOKIE)) {
Cookie cookie = ClientCookieDecoder.STRICT.decode(cookieString);
addCookie(cookie);
CookieListener cookieListener = request.getCookieListener();
if (cookieListener != null) {
cookieListener.onCookie(cookie);
}
}
ResponseListener<HttpResponse> responseListener = request.getResponseListener();
if (responseListener != null) {
responseListener.onResponse(httpResponse);
}
}
HttpResponse httpResponse = new DefaultHttpResponse(httpAddress, fullHttpResponse);
client.getResponseCounter().incrementAndGet();
try {
Request retryRequest = retry(request, httpResponse);
if (retryRequest != null) {
// retry transport, wait for completion
client.retry(this, retryRequest);
} else {
Request continueRequest = continuation(request, httpResponse);
if (continueRequest != null) {
// continue with new transport, synchronous call here, wait for completion
client.continuation(this, continueRequest);
// streamID is expected to be null, last request on memory is expected to be current, remove request from memory
Request request = requests.remove(requests.lastKey());
if (request != null) {
StatusListener statusListener = request.getStatusListener();
if (statusListener != null) {
statusListener.onStatus(httpResponse.getStatus());
}
for (String cookieString : httpResponse.getHeaders().getAllHeaders(HttpHeaderNames.SET_COOKIE)) {
Cookie cookie = ClientCookieDecoder.STRICT.decode(cookieString);
addCookie(cookie);
CookieListener cookieListener = request.getCookieListener();
if (cookieListener != null) {
cookieListener.onCookie(cookie);
}
}
ResponseListener<HttpResponse> responseListener = request.getResponseListener();
if (responseListener != null) {
responseListener.onResponse(httpResponse);
}
}
} catch (URLSyntaxException | IOException e) {
logger.log(Level.WARNING, e.getMessage(), e);
}
String channelId = channel.id().toString();
Flow flow = channelFlowMap.get(channelId);
if (flow == null) {
return;
}
CompletableFuture<Boolean> promise = flow.get(flow.lastKey());
if (promise != null) {
promise.complete(true);
try {
Request retryRequest = retry(request, httpResponse);
if (retryRequest != null) {
// retry transport, wait for completion
client.retry(this, retryRequest);
} else {
Request continueRequest = continuation(request, httpResponse);
if (continueRequest != null) {
// continue with new transport, synchronous call here, wait for completion
client.continuation(this, continueRequest);
}
}
} catch (URLSyntaxException | IOException e) {
logger.log(Level.WARNING, e.getMessage(), e);
}
String channelId = channel.id().toString();
Flow flow = channelFlowMap.get(channelId);
if (flow == null) {
return;
}
CompletableFuture<Boolean> promise = flow.get(flow.lastKey());
if (promise != null) {
promise.complete(true);
}
} finally {
httpResponse.release();
}
}

View file

@ -1,6 +1,7 @@
package org.xbib.netty.http.client.transport;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.util.AttributeKey;
@ -29,7 +30,7 @@ public interface Transport {
void settingsReceived(Http2Settings http2Settings) throws IOException;
void responseReceived(Channel channel, Integer streamId, HttpResponse fullHttpResponse) throws IOException;
void responseReceived(Channel channel, Integer streamId, FullHttpResponse fullHttpResponse) throws IOException;
void pushPromiseReceived(Channel channel, Integer streamId, Integer promisedStreamId, Http2Headers headers);

View file

@ -8,19 +8,26 @@ import org.xbib.netty.http.client.Request;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.Provider;
import java.util.logging.Level;
import java.util.logging.Logger;
@ExtendWith(NettyHttpExtension.class)
@ExtendWith(NettyHttpTestExtension.class)
class ConscryptTest {
private static final Logger logger = Logger.getLogger(ConscryptTest.class.getName());
@Test
void testConscrypt() throws IOException {
Provider provider = Conscrypt.newProviderBuilder()
.provideTrustManager(true)
.build();
Client client = Client.builder()
.setJdkSslProvider()
.setSslContextProvider(Conscrypt.newProvider())
.setSslContextProvider(provider)
.setTlsProtocols(new String[]{"TLSv1.2"}) // disable TLSv1.3 for Conscrypt
.build();
logger.log(Level.INFO, client.getClientConfig().toString());
try {

View file

@ -10,7 +10,7 @@ import java.nio.charset.StandardCharsets;
import java.util.logging.Level;
import java.util.logging.Logger;
@ExtendWith(NettyHttpExtension.class)
@ExtendWith(NettyHttpTestExtension.class)
class CookieSetterHttpBinTest {
private static final Logger logger = Logger.getLogger(CookieSetterHttpBinTest.class.getName());

View file

@ -11,7 +11,7 @@ import java.nio.charset.StandardCharsets;
import java.util.logging.Level;
import java.util.logging.Logger;
@ExtendWith(NettyHttpExtension.class)
@ExtendWith(NettyHttpTestExtension.class)
class Http1Test {
private static final Logger logger = Logger.getLogger(Http1Test.class.getName());
@ -22,7 +22,7 @@ class Http1Test {
.build();
try {
Request request = Request.get().url("http://xbib.org").build()
.setResponseListener(resp -> logger.log(Level.INFO,
.setResponseListener(resp -> logger.log(Level.FINE,
"got response: " + resp.getHeaders() +
resp.getBodyAsString(StandardCharsets.UTF_8) +
" status=" + resp.getStatus()));
@ -38,12 +38,12 @@ class Http1Test {
.build();
try {
Request request1 = Request.get().url("http://xbib.org").build()
.setResponseListener(resp -> logger.log(Level.INFO, "got response: " +
.setResponseListener(resp -> logger.log(Level.FINE, "got response: " +
resp.getBodyAsString(StandardCharsets.UTF_8)));
client.execute(request1).get();
Request request2 = Request.get().url("http://google.com").setVersion("HTTP/1.1").build()
.setResponseListener(resp -> logger.log(Level.INFO, "got response: " +
.setResponseListener(resp -> logger.log(Level.FINE, "got response: " +
resp.getBodyAsString(StandardCharsets.UTF_8)));
client.execute(request2).get();
} finally {
@ -59,12 +59,12 @@ class Http1Test {
Request request1 = Request.builder(HttpMethod.GET)
.url("http://xbib.org").setVersion("HTTP/1.1")
.build()
.setResponseListener(resp -> logger.log(Level.INFO, "got response: " +
.setResponseListener(resp -> logger.log(Level.FINE, "got response: " +
resp.getHeaders() + " status=" +resp.getStatus()));
Request request2 = Request.builder(HttpMethod.GET)
.url("http://xbib.org").setVersion("HTTP/1.1")
.build()
.setResponseListener(resp -> logger.log(Level.INFO, "got response: " +
.setResponseListener(resp -> logger.log(Level.FINE, "got response: " +
resp.getHeaders() + " status=" +resp.getStatus()));
for (int i = 0; i < 10; i++) {

View file

@ -12,7 +12,7 @@ import java.util.logging.LogManager;
import java.util.logging.Logger;
import java.util.logging.SimpleFormatter;
public class NettyHttpExtension implements BeforeAllCallback {
public class NettyHttpTestExtension implements BeforeAllCallback {
@Override
public void beforeAll(ExtensionContext context) {
@ -20,6 +20,7 @@ public class NettyHttpExtension implements BeforeAllCallback {
Security.addProvider(new BouncyCastleProvider());
}
System.setProperty("io.netty.noUnsafe", Boolean.toString(true));
// System.setProperty("io.netty.leakDetection.level", "paranoid");
Level level = Level.INFO;
System.setProperty("java.util.logging.SimpleFormatter.format",
"%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS.%1$tL %4$-7s [%3$s] %5$s %6$s%n");

View file

@ -18,15 +18,12 @@ class RequestBuilderTest {
URI uri = URI.create("http://localhost");
URI uri2 = uri.resolve("/path");
assertEquals("http://localhost/path", uri2.toString());
uri = URI.create("http://localhost/path1?a=b");
uri2 = uri.resolve("path2?c=d");
assertEquals("http://localhost/path2?c=d", uri2.toString());
URL url = URL.from("http://localhost");
URL url2 = url.resolve("/path");
assertEquals("http://localhost/path", url2.toString());
url = URL.from("http://localhost/path1?a=b");
url2 = url.resolve("path2?c=d");
assertEquals("http://localhost/path2?c=d", url2.toString());
@ -35,13 +32,13 @@ class RequestBuilderTest {
@Test
void testRelativeUri() {
Request.Builder httpRequestBuilder = Request.get();
httpRequestBuilder.url("https://localhost").uri("/path");
httpRequestBuilder.url("https://localhost/path");
assertEquals("/path", httpRequestBuilder.build().relative());
httpRequestBuilder.uri("/foobar");
httpRequestBuilder.url("https://localhost/foobar");
assertEquals("/foobar", httpRequestBuilder.build().relative());
httpRequestBuilder.uri("/path1?a=b");
httpRequestBuilder.url("/path1?a=b");
assertEquals("/path1?a=b", httpRequestBuilder.build().relative());
httpRequestBuilder.uri("/path2?c=d");
httpRequestBuilder.url("/path2?c=d");
assertEquals("/path2?c=d", httpRequestBuilder.build().relative());
}
@ -98,7 +95,6 @@ class RequestBuilderTest {
assertEquals("?%20a%20=%20b", request.relative());
assertEquals("https://google.com? a = b", request.url().toString());
assertEquals("https://google.com?%20a%20=%20b", request.url().toExternalForm());
request = Request.get()
.url("https://google.com?%20a%20=%20b")
.build();

View file

@ -11,14 +11,15 @@ import java.nio.charset.StandardCharsets;
import java.util.logging.Level;
import java.util.logging.Logger;
@ExtendWith(NettyHttpExtension.class)
@ExtendWith(NettyHttpTestExtension.class)
class SecureHttpTest {
private static final Logger logger = Logger.getLogger(SecureHttpTest.class.getName());
@Test
void testHttp1() throws Exception {
void testHttp1WithTlsV13() throws Exception {
Client client = Client.builder()
.setTlsProtocols(new String[] { "TLSv1.3" })
.build();
try {
Request request = Request.get().url("https://www.google.com/").build()
@ -37,13 +38,14 @@ class SecureHttpTest {
.build();
try {
Request request1 = Request.get().url("https://google.com").build()
.setResponseListener(resp -> logger.log(Level.INFO, "got response: " +
.setResponseListener(resp -> logger.log(Level.INFO, "got HTTP 1.1 response: " +
resp.getBodyAsString(StandardCharsets.UTF_8)));
client.execute(request1).get();
// TODO decompression of frames
Request request2 = Request.get().url("https://google.com").setVersion("HTTP/2.0").build()
.setResponseListener(resp -> logger.log(Level.INFO, "got response: " +
resp.getBodyAsString(StandardCharsets.UTF_8)));
.setResponseListener(resp -> logger.log(Level.INFO, "got HTTP/2 response: " +
resp.getHeaders() + resp.getBodyAsString(StandardCharsets.UTF_8)));
client.execute(request2).get();
} finally {
client.shutdownGracefully();

View file

@ -11,7 +11,7 @@ import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
@ExtendWith(NettyHttpExtension.class)
@ExtendWith(NettyHttpTestExtension.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class ThreadLeakTest {

View file

@ -4,14 +4,14 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.netty.http.client.Client;
import org.xbib.netty.http.client.Request;
import org.xbib.netty.http.client.test.NettyHttpExtension;
import org.xbib.netty.http.client.test.NettyHttpTestExtension;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.logging.Level;
import java.util.logging.Logger;
@ExtendWith(NettyHttpExtension.class)
@ExtendWith(NettyHttpTestExtension.class)
public class AkamaiTest {
private static Logger logger = Logger.getLogger(AkamaiTest.class.getName());

View file

@ -22,6 +22,8 @@ import io.netty.util.AttributeKey;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.netty.http.client.test.NettyHttpTestExtension;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -39,30 +41,14 @@ import java.util.logging.LogManager;
import java.util.logging.Logger;
import java.util.logging.SimpleFormatter;
@ExtendWith(NettyHttpTestExtension.class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class SimpleHttp1Test {
class SimpleHttp1Test {
private static final Logger logger = Logger.getLogger(SimpleHttp1Test.class.getName());
static {
System.setProperty("io.netty.leakDetection.level", "paranoid");
System.setProperty("io.netty.noKeySetOptimization", Boolean.toString(true));
System.setProperty("java.util.logging.SimpleFormatter.format",
"%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS.%1$tL %4$-7s [%3$s] %2$s %5$s %6$s%n");
LogManager.getLogManager().reset();
Logger rootLogger = LogManager.getLogManager().getLogger("");
Handler handler = new ConsoleHandler();
handler.setFormatter(new SimpleFormatter());
rootLogger.addHandler(handler);
rootLogger.setLevel(Level.ALL);
for (Handler h : rootLogger.getHandlers()) {
handler.setFormatter(new SimpleFormatter());
h.setLevel(Level.ALL);
}
}
@AfterAll
public void checkThreads() {
void checkThreads() {
Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
logger.log(Level.INFO, "threads = " + threadSet.size() );
threadSet.forEach( thread -> {
@ -117,16 +103,12 @@ public class SimpleHttp1Test {
private final Bootstrap bootstrap;
private final HttpResponseHandler httpResponseHandler;
private final Initializer initializer;
private final List<HttpTransport> transports;
Client() {
eventLoopGroup = new NioEventLoopGroup();
httpResponseHandler = new HttpResponseHandler();
initializer = new Initializer(httpResponseHandler);
HttpResponseHandler httpResponseHandler = new HttpResponseHandler();
Initializer initializer = new Initializer(httpResponseHandler);
bootstrap = new Bootstrap()
.group(eventLoopGroup)
.channel(NioSocketChannel.class)
@ -154,7 +136,7 @@ public class SimpleHttp1Test {
return transport;
}
synchronized void close() {
void close() {
for (HttpTransport transport : transports) {
transport.close();
}

View file

@ -5,13 +5,13 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.netty.http.client.Client;
import org.xbib.netty.http.client.Request;
import org.xbib.netty.http.client.test.NettyHttpExtension;
import org.xbib.netty.http.client.test.NettyHttpTestExtension;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
@ExtendWith(NettyHttpExtension.class)
@ExtendWith(NettyHttpTestExtension.class)
class Http2PushTest {
private static final Logger logger = Logger.getLogger(Http2PushTest.class.getName());

View file

@ -6,9 +6,10 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.net.URL;
import org.xbib.netty.http.client.Client;
import org.xbib.netty.http.client.listener.ResponseListener;
import org.xbib.netty.http.client.test.NettyHttpExtension;
import org.xbib.netty.http.client.test.NettyHttpTestExtension;
import org.xbib.netty.http.common.HttpAddress;
import org.xbib.netty.http.client.Request;
import org.xbib.netty.http.common.HttpResponse;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@ -19,7 +20,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
@ExtendWith(NettyHttpExtension.class)
@ExtendWith(NettyHttpTestExtension.class)
class PooledClientTest {
private static final Logger logger = Logger.getLogger(PooledClientTest.class.getName());
@ -28,14 +29,14 @@ class PooledClientTest {
void testPooledClientWithSingleNode() throws IOException {
int loop = 10;
int threads = Runtime.getRuntime().availableProcessors();
URL url = URL.from("https://fl-test.hbz-nrw.de/app/fl");
URL url = URL.from("https://fl-test.hbz-nrw.de/");
HttpAddress httpAddress = HttpAddress.of(url, HttpVersion.valueOf("HTTP/2.0"));
Client client = Client.builder()
.addPoolNode(httpAddress)
.setPoolNodeConnectionLimit(threads)
.build();
AtomicInteger count = new AtomicInteger();
ResponseListener responseListener = resp -> {
ResponseListener<HttpResponse> responseListener = resp -> {
String response = resp.getBodyAsString(StandardCharsets.UTF_8);
count.getAndIncrement();
};

View file

@ -5,13 +5,13 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.netty.http.client.Client;
import org.xbib.netty.http.client.Request;
import org.xbib.netty.http.client.test.NettyHttpExtension;
import org.xbib.netty.http.client.test.NettyHttpTestExtension;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
@ExtendWith(NettyHttpExtension.class)
@ExtendWith(NettyHttpTestExtension.class)
class WebtideTest {
private static final Logger logger = Logger.getLogger(WebtideTest.class.getName());

View file

@ -1,5 +0,0 @@
handlers = java.util.logging.ConsoleHandler
.level = FINE
java.util.logging.ConsoleHandler.level = FINE
java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
java.util.logging.SimpleFormatter.format = %1$tFT%1$tT.%1$tL%1$tz [%4$-11s] [%3$s] %5$s %6$s%n

View file

@ -19,9 +19,9 @@ public class DefaultHttpResponse implements HttpResponse {
public DefaultHttpResponse(HttpAddress httpAddress, FullHttpResponse fullHttpResponse) {
this.httpAddress = httpAddress;
this.fullHttpResponse = fullHttpResponse;
this.httpStatus = new HttpStatus(fullHttpResponse.status());
this.httpHeaders = new DefaultHttpHeaders(fullHttpResponse.headers());
this.fullHttpResponse = fullHttpResponse.retain();
this.httpStatus = new HttpStatus(this.fullHttpResponse.status());
this.httpHeaders = new DefaultHttpHeaders(this.fullHttpResponse.headers());
}
@Override
@ -41,7 +41,7 @@ public class DefaultHttpResponse implements HttpResponse {
@Override
public ByteBuf getBody() {
return fullHttpResponse.content().asReadOnly();
return fullHttpResponse.content();
}
@Override
@ -53,4 +53,9 @@ public class DefaultHttpResponse implements HttpResponse {
public String getBodyAsString(Charset charset) {
return getBody().toString(charset);
}
@Override
public void release() {
this.fullHttpResponse.release();
}
}

View file

@ -18,4 +18,6 @@ public interface HttpResponse {
InputStream getBodyAsStream();
String getBodyAsString(Charset charset);
void release();
}

View file

@ -1,5 +1,5 @@
package org.xbib.netty.http.common.cookie;
public enum SameSite {
STRICT, LAX
STRICT, LAX, NONE
}

View file

@ -30,13 +30,15 @@ import org.xbib.netty.http.server.transport.Transport;
import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* HTTP server.
*/
public final class Server {
public final class Server implements AutoCloseable {
private static final Logger logger = Logger.getLogger(Server.class.getName());
@ -54,6 +56,10 @@ public final class Server {
}
}
private static final AtomicLong requestCounter = new AtomicLong();
private static final AtomicLong responseCounter = new AtomicLong();
private final ServerConfig serverConfig;
private final ByteBufAllocator byteBufAllocator;
@ -136,16 +142,16 @@ public final class Server {
/**
* Returns the named server with the given name.
*
* @param name the name of the virtual host to return, or null for
* the default virtual host
* @return the virtual host with the given name, or null if it doesn't exist
* @param name the name of the virtual host to return or null for the
* default domain
* @return the virtual host with the given name or the default domain
*/
public Domain getNamedServer(String name) {
return serverConfig.getDomain(name);
}
public Domain getDefaultNamedServer() {
return serverConfig.getDefaultDomain();
Domain domain = serverConfig.getDomain(name);
if (domain == null) {
domain = serverConfig.getDefaultDomain();
}
return domain;
}
/**
@ -170,7 +176,7 @@ public final class Server {
logger.log(level, () -> "OpenSSL available: " + OpenSsl.isAvailable());
logger.log(level, () -> "OpenSSL ALPN support: " + OpenSsl.isAlpnSupported());
logger.log(level, () -> "Installed ciphers on default server: " +
(serverConfig.getAddress().isSecure() ? getDefaultNamedServer().getSslContext().cipherSuites() : ""));
(serverConfig.getAddress().isSecure() ? serverConfig.getDefaultDomain().getSslContext().cipherSuites() : ""));
logger.log(level, () -> "Local host name: " + NetworkUtils.getLocalHostName("localhost"));
logger.log(level, () -> "Parent event loop group: " + parentEventLoopGroup + " threads=" + serverConfig.getParentThreadCount());
logger.log(level, () -> "Child event loop group: " + childEventLoopGroup + " threads=" +serverConfig.getChildThreadCount());
@ -179,18 +185,49 @@ public final class Server {
logger.log(level, NetworkUtils::displayNetworkInterfaces);
}
public AtomicLong getRequestCounter() {
return requestCounter;
}
public AtomicLong getResponseCounter() {
return responseCounter;
}
public Transport newTransport(HttpVersion httpVersion) {
return httpVersion.majorVersion() == 1 ? new HttpTransport(this) : new Http2Transport(this);
}
public synchronized void shutdownGracefully() throws IOException {
logger.log(Level.FINE, "shutting down gracefully");
@Override
public void close() {
try {
shutdownGracefully();
} catch (IOException e) {
logger.log(Level.SEVERE, e.getMessage(), e);
}
}
public void shutdownGracefully() throws IOException {
shutdownGracefully(30L, TimeUnit.SECONDS);
}
public void shutdownGracefully(long amount, TimeUnit timeUnit) throws IOException {
logger.log(Level.FINE, "shutting down");
// first, shut down threads, then server socket
childEventLoopGroup.shutdownGracefully();
parentEventLoopGroup.shutdownGracefully();
childEventLoopGroup.shutdownGracefully(1L, amount, timeUnit);
try {
childEventLoopGroup.awaitTermination(amount, timeUnit);
} catch (InterruptedException e) {
throw new IOException(e);
}
parentEventLoopGroup.shutdownGracefully(1L, amount, timeUnit);
try {
childEventLoopGroup.awaitTermination(amount, timeUnit);
} catch (InterruptedException e) {
throw new IOException(e);
}
try {
if (channelFuture != null) {
// close channel and wait
// close channel and wait for unbind
channelFuture.channel().closeFuture().sync();
}
} catch (InterruptedException e) {
@ -441,5 +478,4 @@ public final class Server {
return new Server(serverConfig, byteBufAllocator, parentEventLoopGroup, childEventLoopGroup, socketChannelClass);
}
}
}

View file

@ -4,11 +4,17 @@ import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.epoll.Epoll;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.ssl.CipherSuiteFilter;
import io.netty.handler.ssl.SslProvider;
import org.xbib.netty.http.common.HttpAddress;
import org.xbib.netty.http.common.security.SecurityUtil;
import java.security.KeyStore;
import java.security.Provider;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import javax.net.ssl.TrustManagerFactory;
public class ServerConfig {
@ -138,6 +144,31 @@ public class ServerConfig {
*/
boolean INSTALL_HTTP_UPGRADE2 = false;
/**
* Default SSL provider.
*/
SslProvider SSL_PROVIDER = SecurityUtil.Defaults.DEFAULT_SSL_PROVIDER;
/**
* Default SSL context provider (for JDK SSL only).
*/
Provider SSL_CONTEXT_PROVIDER = null;
/**
* Transport layer security protocol versions.
*/
String[] PROTOCOLS = new String[] { "TLSv1.3", "TLSv1.2" };
/**
* Default ciphers. We care about HTTP/2.
*/
Iterable<String> CIPHERS = SecurityUtil.Defaults.DEFAULT_CIPHERS;
/**
* Default cipher suite filter.
*/
CipherSuiteFilter CIPHER_SUITE_FILTER = SecurityUtil.Defaults.DEFAULT_CIPHER_SUITE_FILTER;
}
private HttpAddress httpAddress = Defaults.ADDRESS;
@ -190,6 +221,20 @@ public class ServerConfig {
private final Map<String, Domain> domains;
private SslProvider sslProvider = Defaults.SSL_PROVIDER;
private Provider sslContextProvider = Defaults.SSL_CONTEXT_PROVIDER;
private String[] protocols = Defaults.PROTOCOLS;
private Iterable<String> ciphers = Defaults.CIPHERS;
private CipherSuiteFilter cipherSuiteFilter = Defaults.CIPHER_SUITE_FILTER;
private TrustManagerFactory trustManagerFactory = SecurityUtil.Defaults.DEFAULT_TRUST_MANAGER_FACTORY;
private KeyStore trustManagerKeyStore = null;
public ServerConfig() {
this.domains = new LinkedHashMap<>();
}
@ -425,12 +470,83 @@ public class ServerConfig {
return http2Settings;
}
public ServerConfig setTrustManagerFactory(TrustManagerFactory trustManagerFactory) {
this.trustManagerFactory = trustManagerFactory;
return this;
}
public TrustManagerFactory getTrustManagerFactory() {
return trustManagerFactory;
}
public ServerConfig setTrustManagerKeyStore(KeyStore trustManagerKeyStore) {
this.trustManagerKeyStore = trustManagerKeyStore;
return this;
}
public KeyStore getTrustManagerKeyStore() {
return trustManagerKeyStore;
}
public ServerConfig setSslProvider(SslProvider sslProvider) {
this.sslProvider = sslProvider;
return this;
}
public SslProvider getSslProvider() {
return sslProvider;
}
public ServerConfig setJdkSslProvider() {
this.sslProvider = SslProvider.JDK;
return this;
}
public ServerConfig setOpenSSLSslProvider() {
this.sslProvider = SslProvider.OPENSSL;
return this;
}
public ServerConfig setSslContextProvider(Provider sslContextProvider) {
this.sslContextProvider = sslContextProvider;
return this;
}
public Provider getSslContextProvider() {
return sslContextProvider;
}
public ServerConfig setProtocols(String[] protocols) {
this.protocols = protocols;
return this;
}
public String[] getProtocols() {
return protocols;
}
public ServerConfig setCiphers(Iterable<String> ciphers) {
this.ciphers = ciphers;
return this;
}
public Iterable<String> getCiphers() {
return ciphers;
}
public ServerConfig setCipherSuiteFilter(CipherSuiteFilter cipherSuiteFilter) {
this.cipherSuiteFilter = cipherSuiteFilter;
return this;
}
public CipherSuiteFilter getCipherSuiteFilter() {
return cipherSuiteFilter;
}
public ServerConfig putDomain(Domain domain) {
synchronized (domains) {
domains.put(domain.getName(), domain);
for (String alias : domain.getAliases()) {
domains.put(alias, domain);
}
domains.put(domain.getName(), domain);
for (String alias : domain.getAliases()) {
domains.put(alias, domain);
}
return this;
}
@ -445,11 +561,9 @@ public class ServerConfig {
}
public ServerConfig removeDomain(Domain domain) {
synchronized (domains) {
domains.remove(domain.getName());
for (String alias : domain.getAliases()) {
domains.remove(alias, domain);
}
domains.remove(domain.getName());
for (String alias : domain.getAliases()) {
domains.remove(alias, domain);
}
return this;
}

View file

@ -18,8 +18,6 @@ public interface ServerRequest {
URL getURL();
Channel getChannel();
HttpEndpointDescriptor getEndpointDescriptor();
void setContext(List<String> context);
@ -30,6 +28,8 @@ public interface ServerRequest {
Map<String, String> getPathParameters();
String getRequestURI();
HttpMethod getMethod();
HttpHeaders getHeaders();
@ -44,7 +44,7 @@ public interface ServerRequest {
Integer getStreamId();
Integer getRequestId();
Long getRequestId();
SSLSession getSession();

View file

@ -66,10 +66,18 @@ public interface ServerResponse {
}
static void write(ServerResponse serverResponse, HttpResponseStatus status, String contentType, String text) {
ByteBuf byteBuf = ByteBufUtil.writeUtf8(serverResponse.getChannelHandlerContext().alloc(), text);
serverResponse.withStatus(status)
.withContentType(contentType)
.withCharset(StandardCharsets.UTF_8)
.write(ByteBufUtil.writeUtf8(serverResponse.getChannelHandlerContext().alloc(), text));
.write(byteBuf);
}
static void write(ServerResponse serverResponse, HttpResponseStatus status, String contentType, ByteBuf byteBuf) {
serverResponse.withStatus(status)
.withContentType(contentType)
.withCharset(StandardCharsets.UTF_8)
.write(byteBuf);
}
static void write(ServerResponse serverResponse,
@ -79,10 +87,11 @@ public interface ServerResponse {
static void write(ServerResponse serverResponse, HttpResponseStatus status, String contentType,
CharBuffer charBuffer, Charset charset) {
ByteBuf byteBuf = ByteBufUtil.encodeString(serverResponse.getChannelHandlerContext().alloc(), charBuffer, charset);
serverResponse.withStatus(status)
.withContentType(contentType)
.withCharset(charset)
.write(ByteBufUtil.encodeString(serverResponse.getChannelHandlerContext().alloc(), charBuffer, charset));
.write(byteBuf);
}
String EMPTY_STRING = "";

View file

@ -13,9 +13,9 @@ public class HttpEndpointDescriptor implements EndpointDescriptor, Comparable<Ht
private final String contentType;
public HttpEndpointDescriptor(HttpServerRequest serverRequest) {
this.path = extractPath(serverRequest.getRequest().uri());
this.method = serverRequest.getRequest().method().name();
this.contentType = serverRequest.getRequest().headers().get(CONTENT_TYPE);
this.path = extractPath(serverRequest.getRequestURI());
this.method = serverRequest.getMethod().name();
this.contentType = serverRequest.getHeaders().get(CONTENT_TYPE);
}
public String getPath() {

View file

@ -0,0 +1,45 @@
package org.xbib.netty.http.server.handler;
import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.ssl.SniHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.Mapping;
import org.xbib.netty.http.common.HttpAddress;
import org.xbib.netty.http.server.ServerConfig;
import java.net.InetSocketAddress;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;
public class ExtendedSNIHandler extends SniHandler {
private final ServerConfig serverConfig;
private final HttpAddress httpAddress;
public ExtendedSNIHandler(Mapping<? super String, ? extends SslContext> mapping,
ServerConfig serverConfig, HttpAddress httpAddress) {
super(mapping);
this.serverConfig = serverConfig;
this.httpAddress = httpAddress;
}
@Override
protected SslHandler newSslHandler(SslContext context, ByteBufAllocator allocator) {
return newSslHandler(context, serverConfig, allocator, httpAddress);
}
private static SslHandler newSslHandler(SslContext sslContext,
ServerConfig serverConfig,
ByteBufAllocator allocator,
HttpAddress httpAddress) {
InetSocketAddress peer = httpAddress.getInetSocketAddress();
SslHandler sslHandler = sslContext.newHandler(allocator, peer.getHostName(), peer.getPort());
SSLEngine engine = sslHandler.engine();
SSLParameters params = engine.getSSLParameters();
params.setEndpointIdentificationAlgorithm("HTTPS");
engine.setSSLParameters(params);
engine.setEnabledProtocols(serverConfig.getProtocols());
return sslHandler;
}
}

View file

@ -15,13 +15,13 @@ import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.ssl.SniHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.DomainNameMapping;
import org.xbib.netty.http.common.HttpAddress;
import org.xbib.netty.http.server.Server;
import org.xbib.netty.http.server.ServerConfig;
import org.xbib.netty.http.server.handler.ExtendedSNIHandler;
import org.xbib.netty.http.server.handler.TrafficLoggingHandler;
import org.xbib.netty.http.server.transport.Transport;
@ -39,7 +39,6 @@ public class HttpChannelInitializer extends ChannelInitializer<SocketChannel> {
private final HttpAddress httpAddress;
private final HttpHandler httpHandler;
private final DomainNameMapping<SslContext> domainNameMapping;
@ -49,7 +48,6 @@ public class HttpChannelInitializer extends ChannelInitializer<SocketChannel> {
this.server = server;
this.serverConfig = server.getServerConfig();
this.httpAddress = httpAddress;
this.httpHandler = new HttpHandler(server);
this.domainNameMapping = domainNameMapping;
}
@ -71,7 +69,8 @@ public class HttpChannelInitializer extends ChannelInitializer<SocketChannel> {
}
private void configureEncrypted(SocketChannel channel) {
channel.pipeline().addLast("sni-handker", new SniHandler(domainNameMapping));
channel.pipeline().addLast("sni-handler",
new ExtendedSNIHandler(domainNameMapping, serverConfig, httpAddress));
configureCleartext(channel);
}
@ -92,7 +91,7 @@ public class HttpChannelInitializer extends ChannelInitializer<SocketChannel> {
pipeline.addLast("http-server-aggregator", httpObjectAggregator);
pipeline.addLast("http-server-pipelining", new HttpPipeliningHandler(1024));
pipeline.addLast("http-server-chunked-write", new ChunkedWriteHandler());
pipeline.addLast(httpHandler);
pipeline.addLast("http-server-handler", new HttpHandler(server));
}
@Sharable
@ -108,12 +107,14 @@ public class HttpChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
logger.log(Level.FINE, "channelRead: " + msg.getClass().getName());
if (msg instanceof HttpPipelinedRequest) {
HttpPipelinedRequest httpPipelinedRequest = (HttpPipelinedRequest) msg;
if (httpPipelinedRequest.getRequest() instanceof FullHttpRequest) {
FullHttpRequest fullHttpRequest = (FullHttpRequest) httpPipelinedRequest.getRequest();
Transport transport = server.newTransport(fullHttpRequest.protocolVersion());
transport.requestReceived(ctx, fullHttpRequest, httpPipelinedRequest.getSequenceId());
fullHttpRequest.release();
}
} else {
super.channelRead(ctx, msg);

View file

@ -8,6 +8,8 @@ import io.netty.handler.codec.http.LastHttpContent;
import java.util.PriorityQueue;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Implements HTTP pipelining ordering, ensuring that responses are completely served in the same order as their
@ -19,6 +21,8 @@ public class HttpPipeliningHandler extends ChannelDuplexHandler {
private final int pipelineCapacity;
private final Lock lock;
private final Queue<HttpPipelinedResponse> httpPipelinedResponses;
private final AtomicInteger requestCounter;
@ -32,6 +36,7 @@ public class HttpPipeliningHandler extends ChannelDuplexHandler {
*/
public HttpPipeliningHandler(int pipelineCapacity) {
this.pipelineCapacity = pipelineCapacity;
this.lock = new ReentrantLock();
this.httpPipelinedResponses = new PriorityQueue<>(3);
this.requestCounter = new AtomicInteger();
this.writtenRequests = new AtomicInteger();
@ -48,7 +53,8 @@ public class HttpPipeliningHandler extends ChannelDuplexHandler {
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof HttpPipelinedResponse) {
boolean channelShouldClose = false;
synchronized (httpPipelinedResponses) {
lock.lock();
try {
if (httpPipelinedResponses.size() < pipelineCapacity) {
HttpPipelinedResponse currentEvent = (HttpPipelinedResponse) msg;
httpPipelinedResponses.add(currentEvent);
@ -64,6 +70,8 @@ public class HttpPipeliningHandler extends ChannelDuplexHandler {
} else {
channelShouldClose = true;
}
} finally {
lock.unlock();
}
if (channelShouldClose) {
ctx.close();

View file

@ -22,7 +22,6 @@ import io.netty.handler.codec.http2.Http2MultiplexCodecBuilder;
import io.netty.handler.codec.http2.Http2ServerUpgradeCodec;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.ssl.SniHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.util.AsciiString;
@ -30,6 +29,7 @@ import io.netty.util.DomainNameMapping;
import org.xbib.netty.http.common.HttpAddress;
import org.xbib.netty.http.server.Server;
import org.xbib.netty.http.server.ServerConfig;
import org.xbib.netty.http.server.handler.ExtendedSNIHandler;
import org.xbib.netty.http.server.handler.TrafficLoggingHandler;
import org.xbib.netty.http.server.transport.Transport;
@ -76,7 +76,8 @@ public class Http2ChannelInitializer extends ChannelInitializer<Channel> {
}
private void configureEncrypted(Channel channel) {
channel.pipeline().addLast("sni-handler", new SniHandler(domainNameMapping));
channel.pipeline().addLast("sni-handler",
new ExtendedSNIHandler(domainNameMapping, serverConfig, httpAddress));
configureCleartext(channel);
}
@ -123,19 +124,16 @@ public class Http2ChannelInitializer extends ChannelInitializer<Channel> {
pipeline.addLast("server-messages", new ServerMessages());
}
class ServerRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
static class ServerRequestHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest fullHttpRequest) throws IOException {
if (server.getServerConfig().isDebug() && logger.isLoggable(Level.FINE)) {
logger.log(Level.FINE, "HTTP/2 server pipeline: " + ctx.channel().pipeline().names());
}
Transport transport = ctx.channel().attr(Transport.TRANSPORT_ATTRIBUTE_KEY).get();
transport.requestReceived(ctx, fullHttpRequest);
transport.requestReceived(ctx, fullHttpRequest, null);
}
}
class ServerMessages extends ChannelInboundHandlerAdapter {
static class ServerMessages extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

View file

@ -10,8 +10,6 @@ import org.xbib.netty.http.server.ServerRequest;
import org.xbib.netty.http.server.ServerResponse;
import org.xbib.netty.http.server.Domain;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -19,8 +17,6 @@ abstract class BaseTransport implements Transport {
private static final Logger logger = Logger.getLogger(BaseTransport.class.getName());
static final AtomicInteger requestCounter = new AtomicInteger();
protected final Server server;
BaseTransport(Server server) {
@ -55,10 +51,7 @@ abstract class BaseTransport implements Transport {
// return a continue response before reading body
String expect = reqHeaders.get(HttpHeaderNames.EXPECT);
if (expect != null) {
if ("100-continue".equalsIgnoreCase(expect)) {
//ServerResponse tempResp = new ServerResponse(serverResponse);
//tempResp.sendHeaders(100);
} else {
if (!"100-continue".equalsIgnoreCase(expect)) {
// RFC2616#14.20: if unknown expect, send 417
ServerResponse.write(serverResponse, HttpResponseStatus.EXPECTATION_FAILED);
return false;
@ -71,17 +64,4 @@ abstract class BaseTransport implements Transport {
}
return true;
}
/**
* Handles a request according to the request method.
* @param domain the named server
* @param serverRequest the request
* @param serverResponse the response (into which the response is written)
* @throws IOException if and error occurs
*/
static void handle(Domain domain, HttpServerRequest serverRequest, ServerResponse serverResponse) throws IOException {
// create server URL and parse parameters from query string, path, and parse body, if exists
serverRequest.handleParameters();
domain.handle(serverRequest, serverResponse);
}
}

View file

@ -18,6 +18,7 @@ import io.netty.handler.codec.http2.Http2HeadersFrame;
import io.netty.handler.codec.http2.HttpConversionUtil;
import io.netty.handler.stream.ChunkedInput;
import org.xbib.netty.http.common.cookie.Cookie;
import org.xbib.netty.http.server.Server;
import org.xbib.netty.http.server.ServerName;
import org.xbib.netty.http.server.ServerRequest;
import org.xbib.netty.http.server.ServerResponse;
@ -35,6 +36,8 @@ public class Http2ServerResponse implements ServerResponse {
private static final Logger logger = Logger.getLogger(Http2ServerResponse.class.getName());
private final Server server;
private final ServerRequest serverRequest;
private final ChannelHandlerContext ctx;
@ -43,13 +46,10 @@ public class Http2ServerResponse implements ServerResponse {
private HttpResponseStatus httpResponseStatus;
private ByteBufOutputStream byteBufOutputStream;
public Http2ServerResponse(HttpServerRequest serverRequest) {
Objects.requireNonNull(serverRequest);
Objects.requireNonNull(serverRequest.getChannelHandlerContext());
Http2ServerResponse(Server server, HttpServerRequest serverRequest, ChannelHandlerContext ctx) {
this.server = server;
this.serverRequest = serverRequest;
this.ctx = serverRequest.getChannelHandlerContext();
this.ctx = ctx;
this.headers = new DefaultHttp2Headers();
}
@ -101,8 +101,7 @@ public class Http2ServerResponse implements ServerResponse {
@Override
public ByteBufOutputStream getOutputStream() {
this.byteBufOutputStream = new ByteBufOutputStream(ctx.alloc().buffer());
return byteBufOutputStream;
return new ByteBufOutputStream(ctx.alloc().buffer());
}
@Override
@ -159,6 +158,9 @@ public class Http2ServerResponse implements ServerResponse {
ctx.channel().write(http2DataFrame);
}
ctx.channel().flush();
server.getResponseCounter().incrementAndGet();
} else {
logger.log(Level.WARNING, "channel is not writeable: " + ctx.channel());
}
}
@ -185,15 +187,15 @@ public class Http2ServerResponse implements ServerResponse {
if (ctx.channel().isWritable()) {
Http2Headers http2Headers = new DefaultHttp2Headers().status(httpResponseStatus.codeAsText()).add(headers);
Http2HeadersFrame http2HeadersFrame = new DefaultHttp2HeadersFrame(http2Headers,false);
logger.log(Level.FINEST, http2HeadersFrame::toString);
ctx.channel().write(http2HeadersFrame);
ChannelFuture channelFuture = ctx.channel().writeAndFlush(new HttpChunkedInput(chunkedInput));
if ("close".equalsIgnoreCase(serverRequest.getHeaders().get(HttpHeaderNames.CONNECTION)) &&
!headers.contains(HttpHeaderNames.CONNECTION)) {
channelFuture.addListener(ChannelFutureListener.CLOSE);
}
server.getResponseCounter().incrementAndGet();
} else {
logger.log(Level.WARNING, "channel not writeable");
logger.log(Level.WARNING, "channel is not writeable: " + ctx.channel());
}
}
}

View file

@ -11,41 +11,39 @@ import org.xbib.netty.http.server.ServerResponse;
import org.xbib.netty.http.server.Domain;
import java.io.IOException;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Http2Transport extends BaseTransport {
private static final Logger logger = Logger.getLogger(Http2Transport.class.getName());
public Http2Transport(Server server) {
super(server);
}
@Override
public void requestReceived(ChannelHandlerContext ctx, FullHttpRequest fullHttpRequest) throws IOException {
requestReceived(ctx, fullHttpRequest, null);
}
@Override
public void requestReceived(ChannelHandlerContext ctx, FullHttpRequest fullHttpRequest, Integer sequenceId) throws IOException {
int requestId = requestCounter.incrementAndGet();
Domain domain = server.getNamedServer(fullHttpRequest.headers().get(HttpHeaderNames.HOST));
if (domain == null) {
domain = server.getDefaultNamedServer();
}
Integer streamId = fullHttpRequest.headers().getInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text());
HttpServerRequest serverRequest = new HttpServerRequest();
serverRequest.setChannelHandlerContext(ctx);
serverRequest.setRequest(fullHttpRequest);
serverRequest.setSequenceId(sequenceId);
serverRequest.setRequestId(requestId);
serverRequest.setStreamId(streamId);
ServerResponse serverResponse = new Http2ServerResponse(serverRequest);
if (acceptRequest(domain, serverRequest, serverResponse)) {
handle(domain, serverRequest, serverResponse);
} else {
ServerResponse.write(serverResponse, HttpResponseStatus.NOT_ACCEPTABLE);
HttpServerRequest serverRequest = new HttpServerRequest(server, fullHttpRequest);
try {
serverRequest.setSequenceId(sequenceId);
serverRequest.setRequestId(server.getRequestCounter().incrementAndGet());
serverRequest.setStreamId(fullHttpRequest.headers().getInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text()));
ServerResponse serverResponse = new Http2ServerResponse(server, serverRequest, ctx);
if (acceptRequest(domain, serverRequest, serverResponse)) {
serverRequest.handleParameters();
domain.handle(serverRequest, serverResponse);
} else {
ServerResponse.write(serverResponse, HttpResponseStatus.NOT_ACCEPTABLE);
}
} finally {
serverRequest.release();
}
}
@Override
public void settingsReceived(ChannelHandlerContext ctx, Http2Settings http2Settings) {
logger.log(Level.FINER, "settings received, ignoring");
}
}

View file

@ -2,7 +2,6 @@ package org.xbib.netty.http.server.transport;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpHeaders;
@ -12,12 +11,12 @@ import org.xbib.net.Pair;
import org.xbib.net.QueryParameters;
import org.xbib.net.URL;
import org.xbib.netty.http.common.HttpParameters;
import org.xbib.netty.http.server.Server;
import org.xbib.netty.http.server.ServerRequest;
import org.xbib.netty.http.server.endpoint.HttpEndpointDescriptor;
import javax.net.ssl.SSLSession;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.MalformedInputException;
import java.nio.charset.StandardCharsets;
import java.nio.charset.UnmappableCharacterException;
@ -34,7 +33,7 @@ public class HttpServerRequest implements ServerRequest {
private static final CharSequence APPLICATION_FORM_URL_ENCODED = "application/x-www-form-urlencoded";
private ChannelHandlerContext ctx;
private final FullHttpRequest httpRequest;
private List<String> context;
@ -42,9 +41,7 @@ public class HttpServerRequest implements ServerRequest {
private Map<String, String> pathParameters = new LinkedHashMap<>();
private FullHttpRequest httpRequest;
private HttpEndpointDescriptor info;
private HttpEndpointDescriptor httpEndpointDescriptor;
private HttpParameters parameters;
@ -54,15 +51,19 @@ public class HttpServerRequest implements ServerRequest {
private Integer streamId;
private Integer requestId;
private Long requestId;
private SSLSession sslSession;
public void handleParameters() throws IOException {
HttpServerRequest(Server server, FullHttpRequest fullHttpRequest) {
this.httpRequest = fullHttpRequest.retainedDuplicate();
this.httpEndpointDescriptor = new HttpEndpointDescriptor(this);
}
void handleParameters() throws IOException {
try {
HttpParameters httpParameters = new HttpParameters();
URL.Builder builder = URL.builder().path(getRequest().uri());
this.url = builder.build();
this.url = URL.builder().path(httpRequest.uri()).build();
QueryParameters queryParameters = url.getQueryParams();
ByteBuf byteBuf = httpRequest.content();
if (APPLICATION_FORM_URL_ENCODED.equals(HttpUtil.getMimeType(httpRequest)) && byteBuf != null) {
@ -78,36 +79,14 @@ public class HttpServerRequest implements ServerRequest {
}
}
public void setChannelHandlerContext(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
public ChannelHandlerContext getChannelHandlerContext() {
return ctx;
}
public void setRequest(FullHttpRequest fullHttpRequest) {
this.httpRequest = fullHttpRequest;
this.info = new HttpEndpointDescriptor(this);
}
public FullHttpRequest getRequest() {
return httpRequest;
}
@Override
public URL getURL() {
return url;
}
@Override
public Channel getChannel() {
return ctx.channel();
}
@Override
public HttpEndpointDescriptor getEndpointDescriptor() {
return info;
return httpEndpointDescriptor;
}
@Override
@ -160,6 +139,11 @@ public class HttpServerRequest implements ServerRequest {
return parameters;
}
@Override
public String getRequestURI() {
return httpRequest.uri();
}
public void setSequenceId(Integer sequenceId) {
this.sequenceId = sequenceId;
}
@ -178,12 +162,12 @@ public class HttpServerRequest implements ServerRequest {
return streamId;
}
public void setRequestId(Integer requestId) {
public void setRequestId(Long requestId) {
this.requestId = requestId;
}
@Override
public Integer getRequestId() {
public Long getRequestId() {
return requestId;
}
@ -206,6 +190,10 @@ public class HttpServerRequest implements ServerRequest {
return new ByteBufInputStream(getContent(), true);
}
public void release() {
httpRequest.release();
}
public String toString() {
return "ServerRequest[request=" + httpRequest + "]";
}

View file

@ -19,13 +19,13 @@ import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.stream.ChunkedInput;
import org.xbib.netty.http.common.cookie.Cookie;
import org.xbib.netty.http.server.Server;
import org.xbib.netty.http.server.ServerName;
import org.xbib.netty.http.server.ServerRequest;
import org.xbib.netty.http.server.ServerResponse;
import org.xbib.netty.http.server.cookie.ServerCookieEncoder;
import org.xbib.netty.http.server.handler.http.HttpPipelinedResponse;
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
@ -38,6 +38,10 @@ public class HttpServerResponse implements ServerResponse {
private static final Logger logger = Logger.getLogger(HttpServerResponse.class.getName());
private static final ByteBuf EMPTY = Unpooled.buffer(0);
private final Server server;
private final ServerRequest serverRequest;
private final ChannelHandlerContext ctx;
@ -48,13 +52,10 @@ public class HttpServerResponse implements ServerResponse {
private HttpResponseStatus httpResponseStatus;
private ByteBufOutputStream byteBufOutputStream;
public HttpServerResponse(HttpServerRequest serverRequest) {
Objects.requireNonNull(serverRequest, "serverRequest");
Objects.requireNonNull(serverRequest.getChannelHandlerContext(), "serverRequest channelHandlerContext");
HttpServerResponse(Server server, HttpServerRequest serverRequest, ChannelHandlerContext ctx) {
this.server = server;
this.serverRequest = serverRequest;
this.ctx = serverRequest.getChannelHandlerContext();
this.ctx = ctx;
this.headers = new DefaultHttpHeaders();
this.trailingHeaders = new DefaultHttpHeaders();
}
@ -107,8 +108,7 @@ public class HttpServerResponse implements ServerResponse {
@Override
public ByteBufOutputStream getOutputStream() {
this.byteBufOutputStream = new ByteBufOutputStream(ctx.alloc().buffer());
return byteBufOutputStream;
return new ByteBufOutputStream(ctx.alloc().buffer());
}
@Override
@ -153,18 +153,23 @@ public class HttpServerResponse implements ServerResponse {
}
headers.add(HttpHeaderNames.SERVER, ServerName.getServerName());
if (ctx.channel().isWritable()) {
FullHttpResponse fullHttpResponse = byteBuf != null ?
new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, httpResponseStatus, byteBuf, headers, trailingHeaders) :
new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, httpResponseStatus, Unpooled.buffer(0), headers, trailingHeaders);
FullHttpResponse fullHttpResponse;
if (byteBuf != null) {
fullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, httpResponseStatus, byteBuf, headers, trailingHeaders);
} else {
fullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, httpResponseStatus, EMPTY, headers, trailingHeaders);
}
if (serverRequest != null && serverRequest.getSequenceId() != null) {
HttpPipelinedResponse httpPipelinedResponse = new HttpPipelinedResponse(fullHttpResponse,
ctx.channel().newPromise(), serverRequest.getSequenceId());
ctx.channel().writeAndFlush(httpPipelinedResponse);
server.getResponseCounter().incrementAndGet();
} else {
ctx.channel().writeAndFlush(fullHttpResponse);
server.getResponseCounter().incrementAndGet();
}
} else {
logger.log(Level.WARNING, "channel not writeable");
logger.log(Level.WARNING, "channel not writeable: " + ctx.channel());
}
}

View file

@ -18,33 +18,27 @@ public class HttpTransport extends BaseTransport {
super(server);
}
@Override
public void requestReceived(ChannelHandlerContext ctx, FullHttpRequest fullHttpRequest) throws IOException {
requestReceived(ctx, fullHttpRequest, 0);
}
@Override
public void requestReceived(ChannelHandlerContext ctx, FullHttpRequest fullHttpRequest, Integer sequenceId)
throws IOException {
int requestId = requestCounter.incrementAndGet();
Domain domain = server.getNamedServer(fullHttpRequest.headers().get(HttpHeaderNames.HOST));
if (domain == null) {
domain = server.getDefaultNamedServer();
}
HttpServerRequest serverRequest = new HttpServerRequest();
serverRequest.setChannelHandlerContext(ctx);
serverRequest.setRequest(fullHttpRequest);
serverRequest.setSequenceId(sequenceId);
serverRequest.setRequestId(requestId);
SslHandler sslHandler = ctx.channel().pipeline().get(SslHandler.class);
if (sslHandler != null) {
serverRequest.setSession(sslHandler.engine().getSession());
}
HttpServerResponse serverResponse = new HttpServerResponse(serverRequest);
if (acceptRequest(domain, serverRequest, serverResponse)) {
handle(domain, serverRequest, serverResponse);
} else {
ServerResponse.write(serverResponse, HttpResponseStatus.NOT_ACCEPTABLE);
HttpServerRequest serverRequest = new HttpServerRequest(server, fullHttpRequest);
try {
serverRequest.setSequenceId(sequenceId);
serverRequest.setRequestId(server.getRequestCounter().incrementAndGet());
SslHandler sslHandler = ctx.channel().pipeline().get(SslHandler.class);
if (sslHandler != null) {
serverRequest.setSession(sslHandler.engine().getSession());
}
HttpServerResponse serverResponse = new HttpServerResponse(server, serverRequest, ctx);
if (acceptRequest(domain, serverRequest, serverResponse)) {
serverRequest.handleParameters();
domain.handle(serverRequest, serverResponse);
} else {
ServerResponse.write(serverResponse, HttpResponseStatus.NOT_ACCEPTABLE);
}
} finally {
serverRequest.release();
}
}

View file

@ -11,8 +11,6 @@ public interface Transport {
AttributeKey<Transport> TRANSPORT_ATTRIBUTE_KEY = AttributeKey.valueOf("transport");
void requestReceived(ChannelHandlerContext ctx, FullHttpRequest fullHttpRequest) throws IOException;
void requestReceived(ChannelHandlerContext ctx, FullHttpRequest fullHttpRequest, Integer sequenceId) throws IOException;
void settingsReceived(ChannelHandlerContext ctx, Http2Settings http2Settings) throws Exception;

View file

@ -18,7 +18,7 @@ import java.util.logging.Logger;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ExtendWith(NettyHttpExtension.class)
@ExtendWith(NettyHttpTestExtension.class)
class ClassloaderServiceTest {
private static final Logger logger = Logger.getLogger(ClassloaderServiceTest.class.getName());

View file

@ -12,7 +12,9 @@ import org.xbib.netty.http.common.HttpAddress;
import org.xbib.netty.http.common.HttpResponse;
import org.xbib.netty.http.server.Server;
import org.xbib.netty.http.server.Domain;
import org.xbib.netty.http.server.ServerResponse;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@ -22,7 +24,7 @@ import java.util.logging.Logger;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ExtendWith(NettyHttpExtension.class)
@ExtendWith(NettyHttpTestExtension.class)
class CleartextHttp1Test {
private static final Logger logger = Logger.getLogger(CleartextHttp1Test.class.getName());
@ -32,9 +34,8 @@ class CleartextHttp1Test {
HttpAddress httpAddress = HttpAddress.http1("localhost", 8008);
Domain domain = Domain.builder(httpAddress)
.singleEndpoint("/**", (request, response) ->
response.withStatus(HttpResponseStatus.OK)
.withContentType("text/plain")
.write(request.getContent().retain()))
ServerResponse.write(response, HttpResponseStatus.OK, "text/plain",
request.getContent().retain()))
.build();
Server server = Server.builder(domain).build();
server.accept();
@ -43,6 +44,7 @@ class CleartextHttp1Test {
AtomicInteger counter = new AtomicInteger();
final ResponseListener<HttpResponse> responseListener = resp -> {
if (resp.getStatus().getCode() == HttpResponseStatus.OK.code()) {
logger.log(Level.INFO, resp.getBodyAsString(StandardCharsets.UTF_8));
counter.incrementAndGet();
}
};
@ -62,13 +64,12 @@ class CleartextHttp1Test {
@Test
void testPooledClearTextHttp1() throws Exception {
int loop = 4096;
int loop = 1000;
HttpAddress httpAddress = HttpAddress.http1("localhost", 8008);
Domain domain = Domain.builder(httpAddress)
.singleEndpoint("/**", (request, response) ->
response.withStatus(HttpResponseStatus.OK)
.withContentType("text/plain")
.write(request.getContent().retain()))
ServerResponse.write(response, HttpResponseStatus.OK, "text/plain",
request.getContent().retain()))
.build();
Server server = Server.builder(domain).build();
server.accept();
@ -79,6 +80,7 @@ class CleartextHttp1Test {
AtomicInteger counter = new AtomicInteger();
final ResponseListener<HttpResponse> responseListener = resp -> {
if (resp.getStatus().getCode() == HttpResponseStatus.OK.code()) {
logger.log(Level.INFO, resp.getBodyAsString(StandardCharsets.UTF_8));
counter.incrementAndGet();
}
};
@ -106,14 +108,13 @@ class CleartextHttp1Test {
@Test
void testMultithreadedPooledClearTextHttp1() throws Exception {
int threads = 4;
int loop = 4 * 1024;
int threads = 8;
int loop = 1000;
HttpAddress httpAddress = HttpAddress.http1("localhost", 8008);
Domain domain = Domain.builder(httpAddress)
.singleEndpoint("/**", (request, response) ->
response.withStatus(HttpResponseStatus.OK)
.withContentType("text/plain")
.write(request.getContent().retain()))
ServerResponse.write(response, HttpResponseStatus.OK, "text/plain",
request.getContent().retain()))
.build();
Server server = Server.builder(domain).build();
server.accept();
@ -124,6 +125,7 @@ class CleartextHttp1Test {
AtomicInteger counter = new AtomicInteger();
final ResponseListener<HttpResponse> responseListener = resp -> {
if (resp.getStatus().getCode() == HttpResponseStatus.OK.code()) {
logger.log(Level.FINE, resp.getBodyAsString(StandardCharsets.UTF_8));
counter.incrementAndGet();
}
};
@ -134,7 +136,7 @@ class CleartextHttp1Test {
executorService.submit(() -> {
try {
for (int i = 0; i < loop; i++) {
String payload = Integer.toString(t) + "/" + Integer.toString(i);
String payload = t + "/" + i;
Request request = Request.get().setVersion(HttpVersion.HTTP_1_1)
.url(server.getServerConfig().getAddress().base())
.content(payload, "text/plain")

View file

@ -13,7 +13,6 @@ import org.xbib.netty.http.server.Server;
import org.xbib.netty.http.server.ServerResponse;
import org.xbib.netty.http.server.Domain;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -24,7 +23,7 @@ import java.util.logging.Logger;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ExtendWith(NettyHttpExtension.class)
@ExtendWith(NettyHttpTestExtension.class)
class CleartextHttp2Test {
private static final Logger logger = Logger.getLogger(CleartextHttp2Test.class.getName());
@ -34,20 +33,23 @@ class CleartextHttp2Test {
HttpAddress httpAddress = HttpAddress.http2("localhost", 8008);
Domain domain = Domain.builder(httpAddress)
.singleEndpoint("/", (request, response) ->
response.withStatus(HttpResponseStatus.OK)
.withContentType("text/plain")
.write(request.getContent().retain()))
ServerResponse.write(response, HttpResponseStatus.OK, "text.plain",
request.getContent().toString(StandardCharsets.UTF_8)))
.build();
Server server = Server.builder(domain)
.build();
Server server = Server.builder(domain).build();
server.accept();
Client client = Client.builder()
.build();
AtomicInteger counter = new AtomicInteger();
// a single instance of HTTP/2 response listener, always receives responses out-of-order
ResponseListener<HttpResponse> responseListener = resp -> {
logger.log(Level.INFO, "response listener: headers = " + resp.getHeaders() +
" response body = " + resp.getBodyAsString(StandardCharsets.UTF_8));
counter.incrementAndGet();
if (resp.getStatus().getCode() == HttpResponseStatus.OK.code()) {
counter.incrementAndGet();
} else {
logger.log(Level.INFO, "response listener: headers = " + resp.getHeaders() +
" response body = " + resp.getBodyAsString(StandardCharsets.UTF_8));
}
};
try {
String payload = 0 + "/" + 0;
@ -63,23 +65,25 @@ class CleartextHttp2Test {
}
transport.get();
} finally {
client.shutdownGracefully();
server.shutdownGracefully();
client.shutdownGracefully();
}
logger.log(Level.INFO, "expecting=" + 1 + " counter=" + counter.get());
assertEquals(1, counter.get());
}
@Test
void testPooledClearTextHttp2() throws Exception {
int loop = 4096;
int loop = 1000;
HttpAddress httpAddress = HttpAddress.http2("localhost", 8008);
Domain domain = Domain.builder(httpAddress)
.singleEndpoint("/", (request, response) ->
response.withStatus(HttpResponseStatus.OK)
.withContentType("text/plain")
.write(request.getContent().retain()))
response.withStatus(HttpResponseStatus.OK)
.withContentType("text/plain")
.write(request.getContent().retain()))
.build();
Server server = Server.builder(domain)
.build();
Server server = Server.builder(domain).build();
server.accept();
Client client = Client.builder()
.addPoolNode(httpAddress)
@ -87,7 +91,14 @@ class CleartextHttp2Test {
.build();
AtomicInteger counter = new AtomicInteger();
// a single instance of HTTP/2 response listener, always receives responses out-of-order
final ResponseListener<HttpResponse> responseListener = resp -> counter.incrementAndGet();
final ResponseListener<HttpResponse> responseListener = resp -> {
if (resp.getStatus().getCode() == HttpResponseStatus.OK.code()) {
counter.incrementAndGet();
} else {
logger.log(Level.INFO, "response listener: headers = " + resp.getHeaders() +
" response body = " + resp.getBodyAsString(StandardCharsets.UTF_8));
}
};
try {
// single transport, single thread
Transport transport = client.newTransport();
@ -106,8 +117,8 @@ class CleartextHttp2Test {
}
transport.get();
} finally {
client.shutdownGracefully();
server.shutdownGracefully();
client.shutdownGracefully();
}
logger.log(Level.INFO, "expecting=" + loop + " counter=" + counter.get());
assertEquals(loop, counter.get());
@ -116,14 +127,15 @@ class CleartextHttp2Test {
@Test
void testMultithreadPooledClearTextHttp2() throws Exception {
int threads = 2;
int loop = 2 * 1024;
int loop = 2000;
HttpAddress httpAddress = HttpAddress.http2("localhost", 8008);
Domain domain = Domain.builder(httpAddress)
.singleEndpoint("/", (request, response) ->
ServerResponse.write(response, HttpResponseStatus.OK, "text/plain",
request.getContent().toString(StandardCharsets.UTF_8)))
request.getContent().retain()))
.build();
Server server = Server.builder(domain)
.build();
Server server = Server.builder(domain).build();
server.accept();
Client client = Client.builder()
.addPoolNode(httpAddress)
@ -131,9 +143,16 @@ class CleartextHttp2Test {
.build();
AtomicInteger counter = new AtomicInteger();
// a HTTP/2 listener always receives responses out-of-order
final ResponseListener<HttpResponse> responseListener = resp -> counter.incrementAndGet();
final ResponseListener<HttpResponse> responseListener = resp -> {
if (resp.getStatus().getCode() == HttpResponseStatus.OK.code()) {
counter.incrementAndGet();
} else {
logger.log(Level.INFO, "response listener: headers = " + resp.getHeaders() +
" response body = " + resp.getBodyAsString(StandardCharsets.UTF_8));
}
};
try {
// note: for HTTP/2 only, we can use a single shared transport
// note: for HTTP/2 only, we use a single shared transport
final Transport transport = client.newTransport();
ExecutorService executorService = Executors.newFixedThreadPool(threads);
for (int n = 0; n < threads; n++) {
@ -153,19 +172,24 @@ class CleartextHttp2Test {
break;
}
}
} catch (IOException e) {
} catch (Throwable e) {
logger.log(Level.WARNING, e.getMessage(), e);
}
});
}
executorService.shutdown();
boolean terminated = executorService.awaitTermination(60, TimeUnit.SECONDS);
boolean terminated = executorService.awaitTermination(10L, TimeUnit.SECONDS);
executorService.shutdownNow();
logger.log(Level.INFO, "terminated = " + terminated + ", now waiting for transport to complete");
transport.get(60, TimeUnit.SECONDS);
transport.get(10L, TimeUnit.SECONDS);
} finally {
client.shutdownGracefully();
server.shutdownGracefully();
server.shutdownGracefully(10L, TimeUnit.SECONDS);
client.shutdownGracefully(10L, TimeUnit.SECONDS);
}
logger.log(Level.INFO, "server requests = " + server.getRequestCounter() +
" server responses = " + server.getResponseCounter());
logger.log(Level.INFO, "client requests = " + client.getRequestCounter() +
" client responses = " + client.getResponseCounter());
logger.log(Level.INFO, "expected=" + (threads * loop) + " counter=" + counter.get());
assertEquals(threads * loop , counter.get());
}
@ -173,18 +197,18 @@ class CleartextHttp2Test {
@Test
void testTwoPooledClearTextHttp2() throws Exception {
int threads = 2;
int loop = 4 * 1024;
int loop = 4000;
HttpAddress httpAddress1 = HttpAddress.http2("localhost", 8008);
AtomicInteger counter1 = new AtomicInteger();
Domain domain1 = Domain.builder(httpAddress1)
.singleEndpoint("/", (request, response) -> {
ServerResponse.write(response, HttpResponseStatus.OK, "text/plain",
request.getContent().toString(StandardCharsets.UTF_8));
counter1.incrementAndGet();
ServerResponse.write(response, HttpResponseStatus.OK, "text.plain",
request.getContent().toString(StandardCharsets.UTF_8));
counter1.incrementAndGet();
})
.build();
Server server1 = Server.builder(domain1).build();
Server server1 = Server.builder(domain1)
.build();
server1.accept();
HttpAddress httpAddress2 = HttpAddress.http2("localhost", 8009);
AtomicInteger counter2 = new AtomicInteger();
@ -195,7 +219,8 @@ class CleartextHttp2Test {
counter2.incrementAndGet();
})
.build();
Server server2 = Server.builder(domain2).build();
Server server2 = Server.builder(domain2)
.build();
server2.accept();
Client client = Client.builder()
.addPoolNode(httpAddress1)
@ -204,7 +229,14 @@ class CleartextHttp2Test {
.build();
AtomicInteger counter = new AtomicInteger();
// a single instance of HTTP/2 response listener, always receives responses out-of-order
final ResponseListener<HttpResponse> responseListener = resp -> counter.incrementAndGet();
final ResponseListener<HttpResponse> responseListener = resp -> {
if (resp.getStatus().getCode() == HttpResponseStatus.OK.code()) {
counter.incrementAndGet();
} else {
logger.log(Level.INFO, "response listener: headers = " + resp.getHeaders() +
" response body = " + resp.getBodyAsString(StandardCharsets.UTF_8));
}
};
try {
// note: for HTTP/2 only, we can use a single shared transport
final Transport transport = client.newTransport();
@ -215,7 +247,9 @@ class CleartextHttp2Test {
try {
for (int i = 0; i < loop; i++) {
String payload = t + "/" + i;
Request request = Request.get().setVersion("HTTP/2.0")
Request request = Request.get()
.setVersion("HTTP/2.0")
//.url(server1.getServerConfig().getAddress().base())
.uri("/")
.content(payload, "text/plain")
.build()
@ -226,20 +260,27 @@ class CleartextHttp2Test {
break;
}
}
} catch (IOException e) {
} catch (Throwable e) {
logger.log(Level.WARNING, e.getMessage(), e);
}
});
}
executorService.shutdown();
boolean terminated = executorService.awaitTermination(60, TimeUnit.SECONDS);
boolean terminated = executorService.awaitTermination(10L, TimeUnit.SECONDS);
logger.log(Level.INFO, "terminated = " + terminated + ", now waiting for transport to complete");
transport.get(60, TimeUnit.SECONDS);
transport.get(10L, TimeUnit.SECONDS);
logger.log(Level.INFO, "transport complete");
} finally {
client.shutdownGracefully();
server1.shutdownGracefully();
server2.shutdownGracefully();
server1.shutdownGracefully(10L, TimeUnit.SECONDS);
server2.shutdownGracefully(10L, TimeUnit.SECONDS);
client.shutdownGracefully(10L, TimeUnit.SECONDS);
}
logger.log(Level.INFO, "server1 requests = " + server1.getRequestCounter() +
" server1 responses = " + server1.getResponseCounter());
logger.log(Level.INFO, "server2 requests = " + server1.getRequestCounter() +
" server2 responses = " + server1.getResponseCounter());
logger.log(Level.INFO, "client requests = " + client.getRequestCounter() +
" client responses = " + client.getResponseCounter());
logger.log(Level.INFO, "counter1=" + counter1.get() + " counter2=" + counter2.get());
logger.log(Level.INFO, "expecting=" + threads * loop + " counter=" + counter.get());
assertEquals(threads * loop, counter.get());

View file

@ -28,7 +28,7 @@ import java.util.logging.Logger;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ExtendWith(NettyHttpExtension.class)
@ExtendWith(NettyHttpTestExtension.class)
class EndpointTest {
private static final Logger logger = Logger.getLogger(EndpointTest.class.getName());

View file

@ -21,7 +21,7 @@ import java.util.logging.Logger;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ExtendWith(NettyHttpExtension.class)
@ExtendWith(NettyHttpTestExtension.class)
class FileServiceTest {
private static final Logger logger = Logger.getLogger(FileServiceTest.class.getName());

View file

@ -12,7 +12,7 @@ import java.util.logging.LogManager;
import java.util.logging.Logger;
import java.util.logging.SimpleFormatter;
public class NettyHttpExtension implements BeforeAllCallback {
public class NettyHttpTestExtension implements BeforeAllCallback {
@Override
public void beforeAll(ExtensionContext context) {
@ -20,6 +20,7 @@ public class NettyHttpExtension implements BeforeAllCallback {
Security.addProvider(new BouncyCastleProvider());
}
System.setProperty("io.netty.noUnsafe", Boolean.toString(true));
//System.setProperty("io.netty.leakDetection.level", "paranoid");
Level level = Level.INFO;
System.setProperty("java.util.logging.SimpleFormatter.format",
"%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS.%1$tL %4$-7s [%3$s] %5$s %6$s%n");

View file

@ -6,8 +6,10 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.netty.http.client.Client;
import org.xbib.netty.http.client.Request;
import org.xbib.netty.http.client.listener.ResponseListener;
import org.xbib.netty.http.common.HttpAddress;
import org.xbib.netty.http.common.HttpParameters;
import org.xbib.netty.http.common.HttpResponse;
import org.xbib.netty.http.server.Server;
import org.xbib.netty.http.server.ServerResponse;
import org.xbib.netty.http.server.Domain;
@ -18,7 +20,7 @@ import java.util.logging.Logger;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ExtendWith(NettyHttpExtension.class)
@ExtendWith(NettyHttpTestExtension.class)
class PostTest {
private static final Logger logger = Logger.getLogger(PostTest.class.getName());
@ -29,9 +31,9 @@ class PostTest {
Domain domain = Domain.builder(httpAddress)
.singleEndpoint("/post", "/**", (req, resp) -> {
HttpParameters parameters = req.getParameters();
logger.log(Level.INFO, "got post " + parameters.toString());
logger.log(Level.INFO, "got request " + parameters.toString() + " , sending, OK");
ServerResponse.write(resp, HttpResponseStatus.OK);
}, "POST")
}, "GET", "POST")
.build();
Server server = Server.builder(domain)
.build();
@ -40,18 +42,23 @@ class PostTest {
final AtomicBoolean success = new AtomicBoolean(false);
try {
server.accept();
Request request = Request.post().setVersion(HttpVersion.HTTP_1_1)
ResponseListener<HttpResponse> responseListener = (resp) -> {
logger.log(Level.INFO, "got response = " + resp);
if (resp.getStatus().getCode() == HttpResponseStatus.OK.code()) {
success.set(true);
}
};
Request postRequest = Request.post().setVersion(HttpVersion.HTTP_1_1)
.url(server.getServerConfig().getAddress().base().resolve("/post/test.txt"))
.addParameter("a", "b")
.addFormParameter("name", "Jörg")
.build()
.setResponseListener(resp -> {
if (resp.getStatus().getCode() == HttpResponseStatus.OK.code()) {
success.set(true);
}
});
client.execute(request).get();
logger.log(Level.INFO, "request complete");
.setResponseListener(responseListener);
client.execute(postRequest).get();
logger.log(Level.INFO, "complete");
} finally {
server.shutdownGracefully();
client.shutdownGracefully();
@ -60,14 +67,13 @@ class PostTest {
assertTrue(success.get());
}
@Test
void testPostHttp2() throws Exception {
HttpAddress httpAddress = HttpAddress.http2("localhost", 8008);
Domain domain = Domain.builder(httpAddress)
.singleEndpoint("/post", "/**", (req, resp) -> {
HttpParameters parameters = req.getParameters();
logger.log(Level.INFO, "got post " + parameters.toString());
logger.log(Level.INFO, "got request " + parameters.toString(), ", sending OK");
ServerResponse.write(resp, HttpResponseStatus.OK);
}, "POST")
.build();
@ -78,18 +84,23 @@ class PostTest {
final AtomicBoolean success = new AtomicBoolean(false);
try {
server.accept();
Request request = Request.post().setVersion("HTTP/2.0")
ResponseListener<HttpResponse> responseListener = (resp) -> {
logger.log(Level.INFO, "got response = " + resp);
if (resp.getStatus().getCode() == HttpResponseStatus.OK.code()) {
success.set(true);
}
};
Request postRequest = Request.post().setVersion("HTTP/2.0")
.url(server.getServerConfig().getAddress().base().resolve("/post/test.txt"))
.addParameter("a", "b")
.addFormParameter("name", "Jörg")
.build()
.setResponseListener(resp -> {
if (resp.getStatus().getCode() == HttpResponseStatus.OK.code()) {
success.set(true);
}
});
client.execute(request).get();
logger.log(Level.INFO, "request complete");
.setResponseListener(responseListener);
client.execute(postRequest).get();
logger.log(Level.INFO, "complete");
} finally {
server.shutdownGracefully();
client.shutdownGracefully();

View file

@ -22,7 +22,7 @@ import java.util.logging.Logger;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ExtendWith(NettyHttpExtension.class)
@ExtendWith(NettyHttpTestExtension.class)
class SecureFileServiceTest {
private static final Logger logger = Logger.getLogger(SecureFileServiceTest.class.getName());

View file

@ -23,7 +23,7 @@ import java.util.logging.Logger;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ExtendWith(NettyHttpExtension.class)
@ExtendWith(NettyHttpTestExtension.class)
class SecureHttp1Test {
private static final Logger logger = Logger.getLogger(SecureHttp1Test.class.getName());
@ -38,6 +38,7 @@ class SecureHttp1Test {
.withContentType("text/plain")
.write(request.getContent().retain()))
.build())
.enableDebug()
.build();
Client client = Client.builder()
.trustInsecure()

View file

@ -23,7 +23,7 @@ import java.util.logging.Logger;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ExtendWith(NettyHttpExtension.class)
@ExtendWith(NettyHttpTestExtension.class)
class SecureHttp2Test {
private static final Logger logger = Logger.getLogger(SecureHttp2Test.class.getName());

View file

@ -17,7 +17,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.assertEquals;
@ExtendWith(NettyHttpExtension.class)
@ExtendWith(NettyHttpTestExtension.class)
class StreamTest {
@Test

View file

@ -15,7 +15,7 @@ import java.util.logging.Level;
import java.util.logging.Logger;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@ExtendWith(NettyHttpExtension.class)
@ExtendWith(NettyHttpTestExtension.class)
class ThreadLeakTest {
private static final Logger logger = Logger.getLogger(ThreadLeakTest.class.getName());

View file

@ -31,7 +31,7 @@ import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.netty.http.server.test.NettyHttpExtension;
import org.xbib.netty.http.server.test.NettyHttpTestExtension;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
@ -39,7 +39,7 @@ import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
@ExtendWith(NettyHttpExtension.class)
@ExtendWith(NettyHttpTestExtension.class)
class CleartextHttp2Test {
private static final Logger clientLogger = Logger.getLogger("client");

View file

@ -26,7 +26,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.netty.http.server.handler.http.HttpPipelinedRequest;
import org.xbib.netty.http.server.handler.http.HttpPipelinedResponse;
import org.xbib.netty.http.server.handler.http.HttpPipeliningHandler;
import org.xbib.netty.http.server.test.NettyHttpExtension;
import org.xbib.netty.http.server.test.NettyHttpTestExtension;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.StandardCharsets;
@ -52,7 +52,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
/** flaky */
@Disabled
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@ExtendWith(NettyHttpExtension.class)
@ExtendWith(NettyHttpTestExtension.class)
class HttpPipeliningHandlerTest {
private static final Logger logger = Logger.getLogger(HttpPipeliningHandlerTest.class.getName());

View file

@ -42,7 +42,7 @@ import io.netty.handler.logging.LoggingHandler;
import io.netty.util.AsciiString;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.netty.http.server.test.NettyHttpExtension;
import org.xbib.netty.http.server.test.NettyHttpTestExtension;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
@ -68,7 +68,7 @@ import java.util.logging.Logger;
*
*
*/
@ExtendWith(NettyHttpExtension.class)
@ExtendWith(NettyHttpTestExtension.class)
class MultiplexCodecCleartextHttp2Test {
private static final Logger clientLogger = Logger.getLogger("client");
@ -82,8 +82,8 @@ class MultiplexCodecCleartextHttp2Test {
@Test
void testMultiplexHttp2() throws Exception {
Http2FrameLogger serverFrameLogger = new Http2FrameLogger(LogLevel.INFO, "server");
Http2FrameLogger clientFrameLogger = new Http2FrameLogger(LogLevel.INFO, "client");
Http2FrameLogger serverFrameLogger = new Http2FrameLogger(LogLevel.DEBUG, "server");
Http2FrameLogger clientFrameLogger = new Http2FrameLogger(LogLevel.DEBUG, "client");
EventLoopGroup serverEventLoopGroup = new NioEventLoopGroup();
EventLoopGroup clientEventLoopGroup = new NioEventLoopGroup();
try {
@ -99,7 +99,8 @@ class MultiplexCodecCleartextHttp2Test {
@Override
protected void initChannel(Channel channel) {
ChannelPipeline p = channel.pipeline();
p.addLast("multiplex-server-traffic", new TrafficLoggingHandler("multiplex-server-traffic", LogLevel.INFO));
p.addLast("multiplex-server-traffic",
new TrafficLoggingHandler("multiplex-server-traffic", LogLevel.DEBUG));
p.addLast("multiplex-server-frame-converter", new Http2StreamFrameToHttpObjectCodec(true));
p.addLast("multiplex-server-chunk-aggregator", new HttpObjectAggregator(1048576));
p.addLast("multiplex-server-request-handler", new ServerRequestHandler());
@ -119,7 +120,8 @@ class MultiplexCodecCleartextHttp2Test {
HttpServerUpgradeHandler upgradeHandler = new HttpServerUpgradeHandler(sourceCodec, upgradeCodecFactory);
CleartextHttp2ServerUpgradeHandler cleartextHttp2ServerUpgradeHandler =
new CleartextHttp2ServerUpgradeHandler(sourceCodec, upgradeHandler, serverMultiplexCodec);
p.addLast("server-traffic", new TrafficLoggingHandler("server-traffic", LogLevel.INFO));
p.addLast("server-traffic",
new TrafficLoggingHandler("server-traffic", LogLevel.DEBUG));
p.addLast("server-upgrade", cleartextHttp2ServerUpgradeHandler);
p.addLast("server-messages", new ServerMessages());
}
@ -164,7 +166,8 @@ class MultiplexCodecCleartextHttp2Test {
@Override
protected void initChannel(Channel ch) {
ChannelPipeline p = ch.pipeline();
p.addLast("child-client-traffic", new TrafficLoggingHandler("child-client-traffic", LogLevel.INFO));
p.addLast("child-client-traffic",
new TrafficLoggingHandler("child-client-traffic", LogLevel.DEBUG));
p.addLast("child-client-frame-converter", new Http2StreamFrameToHttpObjectCodec(false));
p.addLast("child-client-chunk-aggregator", new HttpObjectAggregator(1048576));
p.addLast("child-client-response-handler", new ClientResponseHandler());
@ -195,7 +198,7 @@ class MultiplexCodecCleartextHttp2Test {
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse msg) {
clientLogger.log(Level.INFO, "response received on client: " + msg);
clientLogger.log(Level.FINE, "response received on client: " + msg);
responseFuture.complete(true);
}
}
@ -226,11 +229,11 @@ class MultiplexCodecCleartextHttp2Test {
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) {
serverLogger.log(Level.INFO, "request received on server: " + msg +
serverLogger.log(Level.FINE, "request received on server: " + msg +
" path = " + msg);
DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
HttpResponseStatus.OK);
serverLogger.log(Level.INFO, "writing server response: " + response);
serverLogger.log(Level.FINE, "writing server response: " + response);
ctx.writeAndFlush(response);
}
}

View file

@ -29,7 +29,7 @@ import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandlerBuilder;
import io.netty.handler.codec.http2.InboundHttp2ToHttpAdapterBuilder;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.netty.http.server.test.NettyHttpExtension;
import org.xbib.netty.http.server.test.NettyHttpTestExtension;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
@ -40,7 +40,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
@ExtendWith(NettyHttpExtension.class)
@ExtendWith(NettyHttpTestExtension.class)
class MultithreadedCleartextHttp2Test {
private static final Logger clientLogger = Logger.getLogger("client");

View file

@ -37,7 +37,7 @@ import io.netty.handler.codec.http2.Http2StreamFrameToHttpObjectCodec;
import io.netty.util.AsciiString;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.xbib.netty.http.server.test.NettyHttpExtension;
import org.xbib.netty.http.server.test.NettyHttpTestExtension;
import java.net.InetSocketAddress;
import java.util.concurrent.CompletableFuture;
@ -53,7 +53,7 @@ import java.util.logging.Logger;
* Multithreaded Http2MultiplexCodec demo for cleartext HTTP/2 between a server and a client.
*
*/
@ExtendWith(NettyHttpExtension.class)
@ExtendWith(NettyHttpTestExtension.class)
class MultithreadedMultiplexCodecCleartextHttp2Test {
private static final Logger clientLogger = Logger.getLogger("client");

View file

@ -1,5 +0,0 @@
handlers = java.util.logging.ConsoleHandler
.level = FINE
java.util.logging.ConsoleHandler.level = FINE
java.util.logging.ConsoleHandler.formatter = java.util.logging.SimpleFormatter
java.util.logging.SimpleFormatter.format = %1$tFT%1$tT.%1$tL%1$tz [%4$-11s] [%3$s] %5$s %6$s%n

View file

@ -1,3 +1,4 @@
include 'netty-http-common'
include 'netty-http-client'
include 'netty-http-client-rest'