add local/remote address to server request

This commit is contained in:
Jörg Prante 2019-08-26 12:58:22 +02:00
parent 1c1260bba6
commit 5c14695785
11 changed files with 55 additions and 20 deletions

View file

@ -1,6 +1,6 @@
group = org.xbib group = org.xbib
name = netty-http name = netty-http
version = 4.1.39.1 version = 4.1.39.2
# netty # netty
netty.version = 4.1.39.Final netty.version = 4.1.39.Final

View file

@ -93,8 +93,12 @@ public class RestClient {
RestClient restClient = new RestClient(); RestClient restClient = new RestClient();
Request.Builder requestBuilder = Request.builder(httpMethod).url(url); Request.Builder requestBuilder = Request.builder(httpMethod).url(url);
requestBuilder.content(byteBuf); requestBuilder.content(byteBuf);
try {
client.newTransport(HttpAddress.http1(url)) client.newTransport(HttpAddress.http1(url))
.execute(requestBuilder.build().setResponseListener(restClient::setResponse)).close(); .execute(requestBuilder.build().setResponseListener(restClient::setResponse)).close();
} catch (Exception e) {
throw new IOException(e);
}
return restClient; return restClient;
} }
} }

View file

@ -324,7 +324,6 @@ public final class Client implements AutoCloseable {
for (Transport transport : transports) { for (Transport transport : transports) {
close(transport); close(transport);
} }
// how to wait for all responses for the pool?
if (hasPooledConnections()) { if (hasPooledConnections()) {
pool.close(); pool.close();
} }
@ -337,9 +336,14 @@ public final class Client implements AutoCloseable {
} }
private void close(Transport transport) throws IOException { private void close(Transport transport) throws IOException {
try {
transport.close(); transport.close();
} catch (Exception e) {
throw new IOException(e);
} finally {
transports.remove(transport); transports.remove(transport);
} }
}
/** /**
* Initialize trust manager factory once per client lifecycle. * Initialize trust manager factory once per client lifecycle.

View file

@ -98,6 +98,11 @@ abstract class BaseTransport implements Transport {
if (!channels.isEmpty()) { if (!channels.isEmpty()) {
get(); get();
} }
for (Flow flow : channelFlowMap.values()) {
flow.close();
}
channels.clear();
requests.clear();
} }
@Override @Override

View file

@ -155,6 +155,7 @@ public class Http2Transport extends BaseTransport {
channelId = pos > 0 ? channelId.substring(0, pos) : channelId; channelId = pos > 0 ? channelId.substring(0, pos) : channelId;
Flow flow = channelFlowMap.get(channelId); Flow flow = channelFlowMap.get(channelId);
if (flow == null) { if (flow == null) {
// should never happen since we keep the channelFlowMap around
if (logger.isLoggable(Level.WARNING)) { if (logger.isLoggable(Level.WARNING)) {
logger.log(Level.WARNING, "flow is null? channelId = " + channelId); logger.log(Level.WARNING, "flow is null? channelId = " + channelId);
} }
@ -162,11 +163,12 @@ public class Http2Transport extends BaseTransport {
} }
Request request = requests.remove(getRequestKey(channelId, streamId)); Request request = requests.remove(getRequestKey(channelId, streamId));
if (request == null) { if (request == null) {
CompletableFuture<Boolean> promise = flow.get(streamId);
if (promise != null) {
if (logger.isLoggable(Level.WARNING)) { if (logger.isLoggable(Level.WARNING)) {
logger.log(Level.WARNING, "request is null? channelId = " + channelId + " streamId = " + streamId); logger.log(Level.WARNING, "request is null? channelId = " + channelId + " streamId = " + streamId);
} }
// even if request is null, we may complete the flow with an exception
CompletableFuture<Boolean> promise = flow.get(streamId);
if (promise != null) {
promise.completeExceptionally(new IllegalStateException("no request")); promise.completeExceptionally(new IllegalStateException("no request"));
} }
} else { } else {

View file

@ -16,7 +16,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Function; import java.util.function.Function;
public interface Transport { public interface Transport extends AutoCloseable {
AttributeKey<Transport> TRANSPORT_ATTRIBUTE_KEY = AttributeKey.valueOf("transport"); AttributeKey<Transport> TRANSPORT_ATTRIBUTE_KEY = AttributeKey.valueOf("transport");
@ -52,5 +52,4 @@ public interface Transport {
SSLSession getSession(); SSLSession getSession();
void close() throws IOException;
} }

View file

@ -2,7 +2,6 @@ package org.xbib.netty.http.server;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.ByteBufInputStream;
import io.netty.channel.Channel;
import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpMethod;
import org.xbib.net.URL; import org.xbib.net.URL;
@ -11,6 +10,7 @@ import org.xbib.netty.http.server.endpoint.HttpEndpointDescriptor;
import javax.net.ssl.SSLSession; import javax.net.ssl.SSLSession;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -46,10 +46,14 @@ public interface ServerRequest {
Long getRequestId(); Long getRequestId();
SSLSession getSession();
ByteBuf getContent(); ByteBuf getContent();
ByteBufInputStream getInputStream(); ByteBufInputStream getInputStream();
SSLSession getSession();
InetSocketAddress getLocalAddress();
InetSocketAddress getRemoteAddress();
} }

View file

@ -25,7 +25,7 @@ public class Http2Transport extends BaseTransport {
@Override @Override
public void requestReceived(ChannelHandlerContext ctx, FullHttpRequest fullHttpRequest, Integer sequenceId) throws IOException { public void requestReceived(ChannelHandlerContext ctx, FullHttpRequest fullHttpRequest, Integer sequenceId) throws IOException {
Domain domain = server.getNamedServer(fullHttpRequest.headers().get(HttpHeaderNames.HOST)); Domain domain = server.getNamedServer(fullHttpRequest.headers().get(HttpHeaderNames.HOST));
HttpServerRequest serverRequest = new HttpServerRequest(server, fullHttpRequest); HttpServerRequest serverRequest = new HttpServerRequest(server, fullHttpRequest, ctx);
try { try {
serverRequest.setSequenceId(sequenceId); serverRequest.setSequenceId(sequenceId);
serverRequest.setRequestId(server.getRequestCounter().incrementAndGet()); serverRequest.setRequestId(server.getRequestCounter().incrementAndGet());

View file

@ -17,6 +17,7 @@ import org.xbib.netty.http.server.endpoint.HttpEndpointDescriptor;
import javax.net.ssl.SSLSession; import javax.net.ssl.SSLSession;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.MalformedInputException; import java.nio.charset.MalformedInputException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.nio.charset.UnmappableCharacterException; import java.nio.charset.UnmappableCharacterException;
@ -25,7 +26,8 @@ import java.util.List;
import java.util.Map; import java.util.Map;
/** /**
* The {@code HttpServerRequest} class encapsulates a single request. There must be a default constructor. * The {@code HttpServerRequest} class encapsulates a single request.
* There must be a default constructor.
*/ */
public class HttpServerRequest implements ServerRequest { public class HttpServerRequest implements ServerRequest {
@ -35,6 +37,8 @@ public class HttpServerRequest implements ServerRequest {
private final FullHttpRequest httpRequest; private final FullHttpRequest httpRequest;
private final ChannelHandlerContext ctx;
private List<String> context; private List<String> context;
private String contextPath; private String contextPath;
@ -55,8 +59,11 @@ public class HttpServerRequest implements ServerRequest {
private SSLSession sslSession; private SSLSession sslSession;
HttpServerRequest(Server server, FullHttpRequest fullHttpRequest) { HttpServerRequest(Server server, FullHttpRequest fullHttpRequest,
ChannelHandlerContext ctx) {
// server not required yet
this.httpRequest = fullHttpRequest.retainedDuplicate(); this.httpRequest = fullHttpRequest.retainedDuplicate();
this.ctx = ctx;
this.httpEndpointDescriptor = new HttpEndpointDescriptor(this); this.httpEndpointDescriptor = new HttpEndpointDescriptor(this);
} }
@ -180,6 +187,16 @@ public class HttpServerRequest implements ServerRequest {
return sslSession; return sslSession;
} }
@Override
public InetSocketAddress getLocalAddress() {
return (InetSocketAddress) ctx.channel().localAddress();
}
@Override
public InetSocketAddress getRemoteAddress() {
return (InetSocketAddress) ctx.channel().remoteAddress();
}
@Override @Override
public ByteBuf getContent() { public ByteBuf getContent() {
return httpRequest.content(); return httpRequest.content();

View file

@ -22,7 +22,7 @@ public class HttpTransport extends BaseTransport {
public void requestReceived(ChannelHandlerContext ctx, FullHttpRequest fullHttpRequest, Integer sequenceId) public void requestReceived(ChannelHandlerContext ctx, FullHttpRequest fullHttpRequest, Integer sequenceId)
throws IOException { throws IOException {
Domain domain = server.getNamedServer(fullHttpRequest.headers().get(HttpHeaderNames.HOST)); Domain domain = server.getNamedServer(fullHttpRequest.headers().get(HttpHeaderNames.HOST));
HttpServerRequest serverRequest = new HttpServerRequest(server, fullHttpRequest); HttpServerRequest serverRequest = new HttpServerRequest(server, fullHttpRequest, ctx);
try { try {
serverRequest.setSequenceId(sequenceId); serverRequest.setSequenceId(sequenceId);
serverRequest.setRequestId(server.getRequestCounter().incrementAndGet()); serverRequest.setRequestId(server.getRequestCounter().incrementAndGet());

View file

@ -127,7 +127,7 @@ class CleartextHttp2Test {
@Test @Test
void testMultithreadPooledClearTextHttp2() throws Exception { void testMultithreadPooledClearTextHttp2() throws Exception {
int threads = 2; int threads = 2;
int loop = 2000; int loop = 1000;
HttpAddress httpAddress = HttpAddress.http2("localhost", 8008); HttpAddress httpAddress = HttpAddress.http2("localhost", 8008);
Domain domain = Domain.builder(httpAddress) Domain domain = Domain.builder(httpAddress)
.singleEndpoint("/", (request, response) -> .singleEndpoint("/", (request, response) ->
@ -197,7 +197,7 @@ class CleartextHttp2Test {
@Test @Test
void testTwoPooledClearTextHttp2() throws Exception { void testTwoPooledClearTextHttp2() throws Exception {
int threads = 2; int threads = 2;
int loop = 4000; int loop = 1000;
HttpAddress httpAddress1 = HttpAddress.http2("localhost", 8008); HttpAddress httpAddress1 = HttpAddress.http2("localhost", 8008);
AtomicInteger counter1 = new AtomicInteger(); AtomicInteger counter1 = new AtomicInteger();
Domain domain1 = Domain.builder(httpAddress1) Domain domain1 = Domain.builder(httpAddress1)