better error handling in mixed protocol situations, update tests to mockito 3.1.0, jackson 2.9.10
This commit is contained in:
parent
e9279d1fba
commit
5490eec9f8
8 changed files with 179 additions and 24 deletions
|
@ -1,6 +1,6 @@
|
|||
plugins {
|
||||
id "com.github.spotbugs" version "2.0.0"
|
||||
id "io.codearte.nexus-staging" version "0.21.0"
|
||||
id "com.github.spotbugs" version "2.0.1"
|
||||
id "io.codearte.nexus-staging" version "0.21.1"
|
||||
id "org.xbib.gradle.plugin.asciidoctor" version "1.5.6.0.1"
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
group = org.xbib
|
||||
name = netty-http
|
||||
version = 4.1.43.0
|
||||
version = 4.1.43.1
|
||||
|
||||
# netty
|
||||
netty.version = 4.1.43.Final
|
||||
|
@ -19,15 +19,16 @@ reactivestreams.version = 1.0.2
|
|||
xbib-guice.version = 4.0.4
|
||||
|
||||
# for rx
|
||||
reactivex.version = 1.2.+
|
||||
reactivex.version = 1.2.10
|
||||
|
||||
# test
|
||||
junit.version = 5.5.1
|
||||
junit.version = 5.5.2
|
||||
junit4.version = 4.12
|
||||
conscrypt.version = 2.2.1
|
||||
jackson.version = 2.9.9
|
||||
hamcrest.version = 1.3
|
||||
mockito.version = 1.10.19
|
||||
jackson.version = 2.9.10
|
||||
hamcrest.version = 2.1
|
||||
#mockito.version = 1.10.19
|
||||
mockito.version = 3.1.0
|
||||
|
||||
# doc
|
||||
asciidoclet.version = 1.5.4
|
||||
asciidoclet.version = 1.5.6
|
||||
|
|
|
@ -57,7 +57,8 @@ public class Http1ChannelInitializer extends ChannelInitializer<Channel> impleme
|
|||
configureCleartext(channel);
|
||||
}
|
||||
if (clientConfig.isDebug()) {
|
||||
logger.log(Level.FINE, "HTTP 1.1 client channel initialized: " + channel.pipeline().names());
|
||||
logger.log(Level.FINE, "HTTP 1.1 client channel initialized: " +
|
||||
" address=" + httpAddress + " pipeline=" + channel.pipeline().names());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -88,7 +89,7 @@ public class Http1ChannelInitializer extends ChannelInitializer<Channel> impleme
|
|||
throw new IllegalStateException("protocol not accepted: " + protocol);
|
||||
}
|
||||
};
|
||||
channel.pipeline().addLast(negotiationHandler);
|
||||
channel.pipeline().addLast("client-negotiation", negotiationHandler);
|
||||
} else {
|
||||
configureCleartext(channel);
|
||||
}
|
||||
|
|
|
@ -52,7 +52,8 @@ public class Http2ChannelInitializer extends ChannelInitializer<Channel> impleme
|
|||
configureCleartext(channel);
|
||||
}
|
||||
if (clientConfig.isDebug()) {
|
||||
logger.log(Level.FINE, "HTTP/2 client channel initialized: " + channel.pipeline().names());
|
||||
logger.log(Level.FINE, "HTTP/2 client channel initialized: " +
|
||||
" address=" + httpAddress + " pipeline=" + channel.pipeline().names());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -2,6 +2,6 @@ dependencies {
|
|||
compile "io.netty:netty-codec-http:${project.property('netty.version')}"
|
||||
compile "io.netty:netty-transport-native-epoll:${project.property('netty.version')}"
|
||||
compile "io.reactivex:rxjava:${project.property('reactivex.version')}"
|
||||
testCompile "org.hamcrest:hamcrest-all:${project.property('hamcrest.version')}"
|
||||
testCompile "org.mockito:mockito-all:${project.property('mockito.version')}"
|
||||
testCompile "org.hamcrest:hamcrest-library:${project.property('hamcrest.version')}"
|
||||
testCompile "org.mockito:mockito-core:${project.property('mockito.version')}"
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
|
|||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.handler.codec.http.DefaultFullHttpResponse;
|
||||
import io.netty.handler.codec.http.DefaultHttpResponse;
|
||||
import io.netty.handler.codec.http.FullHttpRequest;
|
||||
import io.netty.handler.codec.http.HttpContentCompressor;
|
||||
import io.netty.handler.codec.http.HttpContentDecompressor;
|
||||
|
@ -42,7 +43,6 @@ public class Http1ChannelInitializer extends ChannelInitializer<Channel>
|
|||
|
||||
private final HttpAddress httpAddress;
|
||||
|
||||
|
||||
private final DomainNameMapping<SslContext> domainNameMapping;
|
||||
|
||||
public Http1ChannelInitializer(Server server,
|
||||
|
@ -67,7 +67,8 @@ public class Http1ChannelInitializer extends ChannelInitializer<Channel>
|
|||
configureCleartext(channel);
|
||||
}
|
||||
if (serverConfig.isTrafficDebug()) {
|
||||
logger.log(Level.FINE, "HTTP 1 channel initialized: " + channel.pipeline().names());
|
||||
logger.log(Level.FINE, "HTTP 1.1 server channel initialized: " +
|
||||
" address=" + httpAddress + " pipeline=" + channel.pipeline().names());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -97,18 +98,18 @@ public class Http1ChannelInitializer extends ChannelInitializer<Channel>
|
|||
httpObjectAggregator.setMaxCumulationBufferComponents(serverConfig.getMaxCompositeBufferComponents());
|
||||
pipeline.addLast("http-server-aggregator", httpObjectAggregator);
|
||||
pipeline.addLast("http-server-pipelining", new HttpPipeliningHandler(serverConfig.getPipeliningCapacity()));
|
||||
pipeline.addLast("http-server-handler", new HttpHandler(server));
|
||||
pipeline.addLast("http-server-handler", new ServerMessages(server));
|
||||
pipeline.addLast("http-idle-timeout-handler", new IdleTimeoutHandler(serverConfig.getIdleTimeoutMillis()));
|
||||
}
|
||||
|
||||
@Sharable
|
||||
class HttpHandler extends ChannelInboundHandlerAdapter {
|
||||
class ServerMessages extends ChannelInboundHandlerAdapter {
|
||||
|
||||
private final Logger logger = Logger.getLogger(HttpHandler.class.getName());
|
||||
private final Logger logger = Logger.getLogger(ServerMessages.class.getName());
|
||||
|
||||
private final Server server;
|
||||
|
||||
public HttpHandler(Server server) {
|
||||
public ServerMessages(Server server) {
|
||||
this.server = server;
|
||||
}
|
||||
|
||||
|
@ -118,8 +119,15 @@ public class Http1ChannelInitializer extends ChannelInitializer<Channel>
|
|||
HttpPipelinedRequest httpPipelinedRequest = (HttpPipelinedRequest) msg;
|
||||
if (httpPipelinedRequest.getRequest() instanceof FullHttpRequest) {
|
||||
FullHttpRequest fullHttpRequest = (FullHttpRequest) httpPipelinedRequest.getRequest();
|
||||
Transport transport = server.newTransport(fullHttpRequest.protocolVersion());
|
||||
transport.requestReceived(ctx, fullHttpRequest, httpPipelinedRequest.getSequenceId());
|
||||
if (fullHttpRequest.protocolVersion().majorVersion() == 2) {
|
||||
// PRI * HTTP/2.0
|
||||
DefaultHttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1,
|
||||
HttpResponseStatus.HTTP_VERSION_NOT_SUPPORTED);
|
||||
ctx.channel().writeAndFlush(response);
|
||||
} else {
|
||||
Transport transport = server.newTransport(fullHttpRequest.protocolVersion());
|
||||
transport.requestReceived(ctx, fullHttpRequest, httpPipelinedRequest.getSequenceId());
|
||||
}
|
||||
fullHttpRequest.release();
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -7,12 +7,16 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
|
|||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.handler.codec.http.DefaultHttpRequest;
|
||||
import io.netty.handler.codec.http.DefaultHttpResponse;
|
||||
import io.netty.handler.codec.http.FullHttpRequest;
|
||||
import io.netty.handler.codec.http.HttpContentCompressor;
|
||||
import io.netty.handler.codec.http.HttpContentDecompressor;
|
||||
import io.netty.handler.codec.http.HttpObjectAggregator;
|
||||
import io.netty.handler.codec.http.HttpResponseStatus;
|
||||
import io.netty.handler.codec.http.HttpServerCodec;
|
||||
import io.netty.handler.codec.http.HttpServerUpgradeHandler;
|
||||
import io.netty.handler.codec.http.HttpVersion;
|
||||
import io.netty.handler.codec.http2.CleartextHttp2ServerUpgradeHandler;
|
||||
import io.netty.handler.codec.http2.DefaultHttp2SettingsFrame;
|
||||
import io.netty.handler.codec.http2.Http2CodecUtil;
|
||||
|
@ -74,8 +78,9 @@ public class Http2ChannelInitializer extends ChannelInitializer<Channel>
|
|||
} else {
|
||||
configureCleartext(channel);
|
||||
}
|
||||
if (server.getServerConfig().isTrafficDebug() && logger.isLoggable(Level.FINE)) {
|
||||
logger.log(Level.FINE, "HTTP/2 server channel initialized: " + channel.pipeline().names());
|
||||
if (serverConfig.isTrafficDebug()) {
|
||||
logger.log(Level.FINE, "HTTP/2 server channel initialized: " +
|
||||
" address=" + httpAddress + " pipeline=" + channel.pipeline().names());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -145,6 +150,10 @@ public class Http2ChannelInitializer extends ChannelInitializer<Channel>
|
|||
DefaultHttp2SettingsFrame http2SettingsFrame = (DefaultHttp2SettingsFrame) msg;
|
||||
Transport transport = ctx.channel().attr(Transport.TRANSPORT_ATTRIBUTE_KEY).get();
|
||||
transport.settingsReceived(ctx, http2SettingsFrame.settings());
|
||||
} else if (msg instanceof DefaultHttpRequest) {
|
||||
DefaultHttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1,
|
||||
HttpResponseStatus.HTTP_VERSION_NOT_SUPPORTED);
|
||||
ctx.channel().writeAndFlush(response);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,135 @@
|
|||
package org.xbib.netty.http.server.test.http2;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.xbib.netty.http.client.Client;
|
||||
import org.xbib.netty.http.client.api.Request;
|
||||
import org.xbib.netty.http.client.api.Transport;
|
||||
import org.xbib.netty.http.common.HttpAddress;
|
||||
import org.xbib.netty.http.server.Domain;
|
||||
import org.xbib.netty.http.server.Server;
|
||||
import org.xbib.netty.http.server.api.ServerResponse;
|
||||
import org.xbib.netty.http.server.test.NettyHttpTestExtension;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import io.netty.handler.codec.http.HttpResponseStatus;
|
||||
|
||||
@ExtendWith(NettyHttpTestExtension.class)
|
||||
class MixedProtocolTest {
|
||||
|
||||
@Test
|
||||
void testHttp1ClientHttp2Server() throws Exception {
|
||||
HttpAddress httpAddress = HttpAddress.http2("localhost", 8008);
|
||||
Domain domain = Domain.builder(httpAddress)
|
||||
.singleEndpoint("/", (request, response) -> {
|
||||
ServerResponse.write(response, HttpResponseStatus.OK);
|
||||
})
|
||||
.build();
|
||||
Server server = Server.builder(domain)
|
||||
.build();
|
||||
Client client = Client.builder()
|
||||
.build();
|
||||
int max = 1;
|
||||
final AtomicInteger count = new AtomicInteger(0);
|
||||
try {
|
||||
server.accept();
|
||||
Request request = Request.get().setVersion("HTTP/1.1")
|
||||
.url(server.getServerConfig().getAddress().base().resolve("/"))
|
||||
.setResponseListener(resp -> {
|
||||
if (resp.getStatus().getCode() == HttpResponseStatus.HTTP_VERSION_NOT_SUPPORTED.code()) {
|
||||
count.incrementAndGet();
|
||||
}
|
||||
})
|
||||
.build();
|
||||
for (int i = 0; i < max; i++) {
|
||||
client.execute(request).get();
|
||||
}
|
||||
} finally {
|
||||
server.shutdownGracefully();
|
||||
client.shutdownGracefully();
|
||||
}
|
||||
assertEquals(max, count.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testHttp2ClientHttp1Server() throws Exception {
|
||||
HttpAddress httpAddress = HttpAddress.http1("localhost", 8008);
|
||||
Domain domain = Domain.builder(httpAddress)
|
||||
.singleEndpoint("/", (request, response) -> {
|
||||
ServerResponse.write(response, HttpResponseStatus.OK);
|
||||
})
|
||||
.build();
|
||||
Server server = Server.builder(domain)
|
||||
.build();
|
||||
Client client = Client.builder()
|
||||
.build();
|
||||
int max = 1;
|
||||
final AtomicInteger count = new AtomicInteger(0);
|
||||
try {
|
||||
server.accept();
|
||||
Request request = Request.get().setVersion("HTTP/2.0")
|
||||
.url(server.getServerConfig().getAddress().base().resolve("/"))
|
||||
.setResponseListener(resp -> {
|
||||
// do nothing
|
||||
})
|
||||
.build();
|
||||
for (int i = 0; i < max; i++) {
|
||||
// HTTP 2 breaks transport
|
||||
Transport transport = client.execute(request).get();
|
||||
if (transport.isFailed()) {
|
||||
count.incrementAndGet();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
server.shutdownGracefully();
|
||||
client.shutdownGracefully();
|
||||
}
|
||||
assertEquals(max, count.get());
|
||||
}
|
||||
|
||||
@Disabled("negotiation does not work")
|
||||
@Test
|
||||
void testHttp1ClientHttp2ServerWithNegotiation() throws Exception {
|
||||
HttpAddress httpAddress = HttpAddress.secureHttp2("localhost", 8143);
|
||||
Domain domain = Domain.builder(httpAddress)
|
||||
.setSelfCert()
|
||||
.singleEndpoint("/", (request, response) -> {
|
||||
ServerResponse.write(response, HttpResponseStatus.OK);
|
||||
})
|
||||
.build();
|
||||
Server server = Server.builder(domain)
|
||||
//.enableDebug()
|
||||
.setTransportLayerSecurityProtocols(new String[]{"TLSv1.2"})
|
||||
.build();
|
||||
Client client = Client.builder()
|
||||
//.enableDebug()
|
||||
.trustInsecure()
|
||||
.enableNegotiation(true)
|
||||
.build();
|
||||
int max = 1;
|
||||
final AtomicInteger count = new AtomicInteger(0);
|
||||
try {
|
||||
server.accept();
|
||||
httpAddress = HttpAddress.secureHttp1("localhost", 8143);
|
||||
Request request = Request.get().setVersion("HTTP/1.1")
|
||||
.url(httpAddress.base().resolve("/"))
|
||||
.setResponseListener(resp -> {
|
||||
count.incrementAndGet();
|
||||
})
|
||||
.build();
|
||||
for (int i = 0; i < max; i++) {
|
||||
Transport transport = client.execute(request).get();
|
||||
if (transport.isFailed()) {
|
||||
count.incrementAndGet();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
server.shutdownGracefully();
|
||||
client.shutdownGracefully();
|
||||
}
|
||||
assertEquals(max, count.get());
|
||||
}
|
||||
|
||||
|
||||
}
|
Loading…
Reference in a new issue