From 5c14695785662b5108c56a52695d7ef49109d10d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=CC=88rg=20Prante?= Date: Mon, 26 Aug 2019 12:58:22 +0200 Subject: [PATCH] add local/remote address to server request --- gradle.properties | 2 +- .../netty/http/client/rest/RestClient.java | 8 +++++-- .../org/xbib/netty/http/client/Client.java | 10 ++++++--- .../http/client/transport/BaseTransport.java | 5 +++++ .../http/client/transport/Http2Transport.java | 8 ++++--- .../http/client/transport/Transport.java | 3 +-- .../xbib/netty/http/server/ServerRequest.java | 10 ++++++--- .../http/server/transport/Http2Transport.java | 2 +- .../server/transport/HttpServerRequest.java | 21 +++++++++++++++++-- .../http/server/transport/HttpTransport.java | 2 +- .../http/server/test/CleartextHttp2Test.java | 4 ++-- 11 files changed, 55 insertions(+), 20 deletions(-) diff --git a/gradle.properties b/gradle.properties index b070a93..710d53f 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,6 @@ group = org.xbib name = netty-http -version = 4.1.39.1 +version = 4.1.39.2 # netty netty.version = 4.1.39.Final diff --git a/netty-http-client-rest/src/main/java/org/xbib/netty/http/client/rest/RestClient.java b/netty-http-client-rest/src/main/java/org/xbib/netty/http/client/rest/RestClient.java index 64b8288..3b6ea13 100644 --- a/netty-http-client-rest/src/main/java/org/xbib/netty/http/client/rest/RestClient.java +++ b/netty-http-client-rest/src/main/java/org/xbib/netty/http/client/rest/RestClient.java @@ -93,8 +93,12 @@ public class RestClient { RestClient restClient = new RestClient(); Request.Builder requestBuilder = Request.builder(httpMethod).url(url); requestBuilder.content(byteBuf); - client.newTransport(HttpAddress.http1(url)) - .execute(requestBuilder.build().setResponseListener(restClient::setResponse)).close(); + try { + client.newTransport(HttpAddress.http1(url)) + .execute(requestBuilder.build().setResponseListener(restClient::setResponse)).close(); + } catch (Exception e) { + throw new IOException(e); + } return restClient; } } diff --git a/netty-http-client/src/main/java/org/xbib/netty/http/client/Client.java b/netty-http-client/src/main/java/org/xbib/netty/http/client/Client.java index 5142278..6046a96 100644 --- a/netty-http-client/src/main/java/org/xbib/netty/http/client/Client.java +++ b/netty-http-client/src/main/java/org/xbib/netty/http/client/Client.java @@ -324,7 +324,6 @@ public final class Client implements AutoCloseable { for (Transport transport : transports) { close(transport); } - // how to wait for all responses for the pool? if (hasPooledConnections()) { pool.close(); } @@ -337,8 +336,13 @@ public final class Client implements AutoCloseable { } private void close(Transport transport) throws IOException { - transport.close(); - transports.remove(transport); + try { + transport.close(); + } catch (Exception e) { + throw new IOException(e); + } finally { + transports.remove(transport); + } } /** diff --git a/netty-http-client/src/main/java/org/xbib/netty/http/client/transport/BaseTransport.java b/netty-http-client/src/main/java/org/xbib/netty/http/client/transport/BaseTransport.java index 2593b26..7f15c11 100644 --- a/netty-http-client/src/main/java/org/xbib/netty/http/client/transport/BaseTransport.java +++ b/netty-http-client/src/main/java/org/xbib/netty/http/client/transport/BaseTransport.java @@ -98,6 +98,11 @@ abstract class BaseTransport implements Transport { if (!channels.isEmpty()) { get(); } + for (Flow flow : channelFlowMap.values()) { + flow.close(); + } + channels.clear(); + requests.clear(); } @Override diff --git a/netty-http-client/src/main/java/org/xbib/netty/http/client/transport/Http2Transport.java b/netty-http-client/src/main/java/org/xbib/netty/http/client/transport/Http2Transport.java index 5f77a0d..2d01a96 100644 --- a/netty-http-client/src/main/java/org/xbib/netty/http/client/transport/Http2Transport.java +++ b/netty-http-client/src/main/java/org/xbib/netty/http/client/transport/Http2Transport.java @@ -155,6 +155,7 @@ public class Http2Transport extends BaseTransport { channelId = pos > 0 ? channelId.substring(0, pos) : channelId; Flow flow = channelFlowMap.get(channelId); if (flow == null) { + // should never happen since we keep the channelFlowMap around if (logger.isLoggable(Level.WARNING)) { logger.log(Level.WARNING, "flow is null? channelId = " + channelId); } @@ -162,11 +163,12 @@ public class Http2Transport extends BaseTransport { } Request request = requests.remove(getRequestKey(channelId, streamId)); if (request == null) { + if (logger.isLoggable(Level.WARNING)) { + logger.log(Level.WARNING, "request is null? channelId = " + channelId + " streamId = " + streamId); + } + // even if request is null, we may complete the flow with an exception CompletableFuture promise = flow.get(streamId); if (promise != null) { - if (logger.isLoggable(Level.WARNING)) { - logger.log(Level.WARNING, "request is null? channelId = " + channelId + " streamId = " + streamId); - } promise.completeExceptionally(new IllegalStateException("no request")); } } else { diff --git a/netty-http-client/src/main/java/org/xbib/netty/http/client/transport/Transport.java b/netty-http-client/src/main/java/org/xbib/netty/http/client/transport/Transport.java index 0eb68b5..90a11c4 100644 --- a/netty-http-client/src/main/java/org/xbib/netty/http/client/transport/Transport.java +++ b/netty-http-client/src/main/java/org/xbib/netty/http/client/transport/Transport.java @@ -16,7 +16,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Function; -public interface Transport { +public interface Transport extends AutoCloseable { AttributeKey TRANSPORT_ATTRIBUTE_KEY = AttributeKey.valueOf("transport"); @@ -52,5 +52,4 @@ public interface Transport { SSLSession getSession(); - void close() throws IOException; } diff --git a/netty-http-server/src/main/java/org/xbib/netty/http/server/ServerRequest.java b/netty-http-server/src/main/java/org/xbib/netty/http/server/ServerRequest.java index 8ac2555..3e4d76f 100644 --- a/netty-http-server/src/main/java/org/xbib/netty/http/server/ServerRequest.java +++ b/netty-http-server/src/main/java/org/xbib/netty/http/server/ServerRequest.java @@ -2,7 +2,6 @@ package org.xbib.netty.http.server; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; -import io.netty.channel.Channel; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpMethod; import org.xbib.net.URL; @@ -11,6 +10,7 @@ import org.xbib.netty.http.server.endpoint.HttpEndpointDescriptor; import javax.net.ssl.SSLSession; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.List; import java.util.Map; @@ -46,10 +46,14 @@ public interface ServerRequest { Long getRequestId(); - SSLSession getSession(); - ByteBuf getContent(); ByteBufInputStream getInputStream(); + SSLSession getSession(); + + InetSocketAddress getLocalAddress(); + + InetSocketAddress getRemoteAddress(); + } diff --git a/netty-http-server/src/main/java/org/xbib/netty/http/server/transport/Http2Transport.java b/netty-http-server/src/main/java/org/xbib/netty/http/server/transport/Http2Transport.java index b6c01f7..2acc524 100644 --- a/netty-http-server/src/main/java/org/xbib/netty/http/server/transport/Http2Transport.java +++ b/netty-http-server/src/main/java/org/xbib/netty/http/server/transport/Http2Transport.java @@ -25,7 +25,7 @@ public class Http2Transport extends BaseTransport { @Override public void requestReceived(ChannelHandlerContext ctx, FullHttpRequest fullHttpRequest, Integer sequenceId) throws IOException { Domain domain = server.getNamedServer(fullHttpRequest.headers().get(HttpHeaderNames.HOST)); - HttpServerRequest serverRequest = new HttpServerRequest(server, fullHttpRequest); + HttpServerRequest serverRequest = new HttpServerRequest(server, fullHttpRequest, ctx); try { serverRequest.setSequenceId(sequenceId); serverRequest.setRequestId(server.getRequestCounter().incrementAndGet()); diff --git a/netty-http-server/src/main/java/org/xbib/netty/http/server/transport/HttpServerRequest.java b/netty-http-server/src/main/java/org/xbib/netty/http/server/transport/HttpServerRequest.java index 4022a63..fb2e9eb 100644 --- a/netty-http-server/src/main/java/org/xbib/netty/http/server/transport/HttpServerRequest.java +++ b/netty-http-server/src/main/java/org/xbib/netty/http/server/transport/HttpServerRequest.java @@ -17,6 +17,7 @@ import org.xbib.netty.http.server.endpoint.HttpEndpointDescriptor; import javax.net.ssl.SSLSession; import java.io.IOException; +import java.net.InetSocketAddress; import java.nio.charset.MalformedInputException; import java.nio.charset.StandardCharsets; import java.nio.charset.UnmappableCharacterException; @@ -25,7 +26,8 @@ import java.util.List; import java.util.Map; /** - * The {@code HttpServerRequest} class encapsulates a single request. There must be a default constructor. + * The {@code HttpServerRequest} class encapsulates a single request. + * There must be a default constructor. */ public class HttpServerRequest implements ServerRequest { @@ -35,6 +37,8 @@ public class HttpServerRequest implements ServerRequest { private final FullHttpRequest httpRequest; + private final ChannelHandlerContext ctx; + private List context; private String contextPath; @@ -55,8 +59,11 @@ public class HttpServerRequest implements ServerRequest { private SSLSession sslSession; - HttpServerRequest(Server server, FullHttpRequest fullHttpRequest) { + HttpServerRequest(Server server, FullHttpRequest fullHttpRequest, + ChannelHandlerContext ctx) { + // server not required yet this.httpRequest = fullHttpRequest.retainedDuplicate(); + this.ctx = ctx; this.httpEndpointDescriptor = new HttpEndpointDescriptor(this); } @@ -180,6 +187,16 @@ public class HttpServerRequest implements ServerRequest { return sslSession; } + @Override + public InetSocketAddress getLocalAddress() { + return (InetSocketAddress) ctx.channel().localAddress(); + } + + @Override + public InetSocketAddress getRemoteAddress() { + return (InetSocketAddress) ctx.channel().remoteAddress(); + } + @Override public ByteBuf getContent() { return httpRequest.content(); diff --git a/netty-http-server/src/main/java/org/xbib/netty/http/server/transport/HttpTransport.java b/netty-http-server/src/main/java/org/xbib/netty/http/server/transport/HttpTransport.java index 4f5b415..6760cd5 100644 --- a/netty-http-server/src/main/java/org/xbib/netty/http/server/transport/HttpTransport.java +++ b/netty-http-server/src/main/java/org/xbib/netty/http/server/transport/HttpTransport.java @@ -22,7 +22,7 @@ public class HttpTransport extends BaseTransport { public void requestReceived(ChannelHandlerContext ctx, FullHttpRequest fullHttpRequest, Integer sequenceId) throws IOException { Domain domain = server.getNamedServer(fullHttpRequest.headers().get(HttpHeaderNames.HOST)); - HttpServerRequest serverRequest = new HttpServerRequest(server, fullHttpRequest); + HttpServerRequest serverRequest = new HttpServerRequest(server, fullHttpRequest, ctx); try { serverRequest.setSequenceId(sequenceId); serverRequest.setRequestId(server.getRequestCounter().incrementAndGet()); diff --git a/netty-http-server/src/test/java/org/xbib/netty/http/server/test/CleartextHttp2Test.java b/netty-http-server/src/test/java/org/xbib/netty/http/server/test/CleartextHttp2Test.java index 891dff5..177a838 100644 --- a/netty-http-server/src/test/java/org/xbib/netty/http/server/test/CleartextHttp2Test.java +++ b/netty-http-server/src/test/java/org/xbib/netty/http/server/test/CleartextHttp2Test.java @@ -127,7 +127,7 @@ class CleartextHttp2Test { @Test void testMultithreadPooledClearTextHttp2() throws Exception { int threads = 2; - int loop = 2000; + int loop = 1000; HttpAddress httpAddress = HttpAddress.http2("localhost", 8008); Domain domain = Domain.builder(httpAddress) .singleEndpoint("/", (request, response) -> @@ -197,7 +197,7 @@ class CleartextHttp2Test { @Test void testTwoPooledClearTextHttp2() throws Exception { int threads = 2; - int loop = 4000; + int loop = 1000; HttpAddress httpAddress1 = HttpAddress.http2("localhost", 8008); AtomicInteger counter1 = new AtomicInteger(); Domain domain1 = Domain.builder(httpAddress1)