refactoring

This commit is contained in:
Jörg Prante 2017-05-25 23:54:00 +02:00
parent 0a4cf575d6
commit 33c76b8df0
21 changed files with 416 additions and 250 deletions

View file

@ -1,6 +1,6 @@
group = org.xbib group = org.xbib
name = netty-http-client name = netty-http-client
version = 4.1.11.1 version = 4.1.11.3
netty.version = 4.1.11.Final netty.version = 4.1.11.Final
tcnative.version = 2.0.1.Final tcnative.version = 2.0.1.Final

View file

@ -38,6 +38,7 @@ import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.ssl.OpenSsl; import io.netty.handler.ssl.OpenSsl;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
import org.xbib.netty.http.client.internal.HttpClientChannelPoolMap;
import org.xbib.netty.http.client.listener.CookieListener; import org.xbib.netty.http.client.listener.CookieListener;
import org.xbib.netty.http.client.listener.ExceptionListener; import org.xbib.netty.http.client.listener.ExceptionListener;
import org.xbib.netty.http.client.listener.HttpHeadersListener; import org.xbib.netty.http.client.listener.HttpHeadersListener;
@ -230,7 +231,7 @@ public final class HttpClient implements Closeable {
logger.log(Level.FINE, () -> "closed"); logger.log(Level.FINE, () -> "closed");
} }
void dispatch(final HttpRequestContext httpRequestContext) { public void dispatch(final HttpRequestContext httpRequestContext) {
final URI uri = httpRequestContext.getURI(); final URI uri = httpRequestContext.getURI();
final HttpRequest httpRequest = httpRequestContext.getHttpRequest(); final HttpRequest httpRequest = httpRequestContext.getHttpRequest();
if (!httpRequestContext.getCookies().isEmpty()) { if (!httpRequestContext.getCookies().isEmpty()) {
@ -262,17 +263,17 @@ public final class HttpClient implements Closeable {
// set settings promise before adding httpRequestContext as a channel attribute // set settings promise before adding httpRequestContext as a channel attribute
ChannelPromise settingsPromise = channel.newPromise(); ChannelPromise settingsPromise = channel.newPromise();
httpRequestContext.setSettingsPromise(settingsPromise); httpRequestContext.setSettingsPromise(settingsPromise);
channel.attr(HttpClientChannelContext.CHANNEL_POOL_ATTRIBUTE_KEY).set(pool); channel.attr(HttpClientChannelContextDefaults.CHANNEL_POOL_ATTRIBUTE_KEY).set(pool);
channel.attr(HttpClientChannelContext.REQUEST_CONTEXT_ATTRIBUTE_KEY).set(httpRequestContext); channel.attr(HttpClientChannelContextDefaults.REQUEST_CONTEXT_ATTRIBUTE_KEY).set(httpRequestContext);
HttpResponseListener httpResponseListener = httpRequestContext.getHttpResponseListener(); HttpResponseListener httpResponseListener = httpRequestContext.getHttpResponseListener();
channel.attr(HttpClientChannelContext.RESPONSE_LISTENER_ATTRIBUTE_KEY).set(httpResponseListener); channel.attr(HttpClientChannelContextDefaults.RESPONSE_LISTENER_ATTRIBUTE_KEY).set(httpResponseListener);
HttpPushListener httpPushListener = httpRequestContext.getHttpPushListener(); HttpPushListener httpPushListener = httpRequestContext.getHttpPushListener();
channel.attr(HttpClientChannelContext.PUSH_LISTENER_ATTRIBUTE_KEY).set(httpPushListener); channel.attr(HttpClientChannelContextDefaults.PUSH_LISTENER_ATTRIBUTE_KEY).set(httpPushListener);
HttpHeadersListener httpHeadersListener = httpRequestContext.getHttpHeadersListener(); HttpHeadersListener httpHeadersListener = httpRequestContext.getHttpHeadersListener();
channel.attr(HttpClientChannelContext.HEADER_LISTENER_ATTRIBUTE_KEY).set(httpHeadersListener); channel.attr(HttpClientChannelContextDefaults.HEADER_LISTENER_ATTRIBUTE_KEY).set(httpHeadersListener);
CookieListener cookieListener = httpRequestContext.getCookieListener(); CookieListener cookieListener = httpRequestContext.getCookieListener();
channel.attr(HttpClientChannelContext.COOKIE_LISTENER_ATTRIBUTE_KEY).set(cookieListener); channel.attr(HttpClientChannelContextDefaults.COOKIE_LISTENER_ATTRIBUTE_KEY).set(cookieListener);
channel.attr(HttpClientChannelContext.EXCEPTION_LISTENER_ATTRIBUTE_KEY).set(exceptionListener); channel.attr(HttpClientChannelContextDefaults.EXCEPTION_LISTENER_ATTRIBUTE_KEY).set(exceptionListener);
if (httpRequestContext.isFailed()) { if (httpRequestContext.isFailed()) {
logger.log(Level.FINE, () -> "detected fail, close channel"); logger.log(Level.FINE, () -> "detected fail, close channel");
future.cancel(true); future.cancel(true);
@ -314,7 +315,7 @@ public final class HttpClient implements Closeable {
exceptionListener.onException(illegalStateException); exceptionListener.onException(illegalStateException);
httpRequestContext.fail(illegalStateException.getMessage()); httpRequestContext.fail(illegalStateException.getMessage());
final ChannelPool channelPool = channelFuture.channel() final ChannelPool channelPool = channelFuture.channel()
.attr(HttpClientChannelContext.CHANNEL_POOL_ATTRIBUTE_KEY).get(); .attr(HttpClientChannelContextDefaults.CHANNEL_POOL_ATTRIBUTE_KEY).get();
channelPool.release(channelFuture.channel()); channelPool.release(channelFuture.channel());
} }
throw illegalStateException; throw illegalStateException;
@ -334,7 +335,7 @@ public final class HttpClient implements Closeable {
httpRequestContext.fail(illegalStateException.getMessage()); httpRequestContext.fail(illegalStateException.getMessage());
if (channelFuture != null) { if (channelFuture != null) {
final ChannelPool channelPool = channelFuture.channel() final ChannelPool channelPool = channelFuture.channel()
.attr(HttpClientChannelContext.CHANNEL_POOL_ATTRIBUTE_KEY).get(); .attr(HttpClientChannelContextDefaults.CHANNEL_POOL_ATTRIBUTE_KEY).get();
channelPool.release(channelFuture.channel()); channelPool.release(channelFuture.channel());
} }
} }
@ -347,7 +348,7 @@ public final class HttpClient implements Closeable {
httpRequestContext.fail(runtimeException.getMessage()); httpRequestContext.fail(runtimeException.getMessage());
if (channelFuture != null) { if (channelFuture != null) {
final ChannelPool channelPool = channelFuture.channel() final ChannelPool channelPool = channelFuture.channel()
.attr(HttpClientChannelContext.CHANNEL_POOL_ATTRIBUTE_KEY).get(); .attr(HttpClientChannelContextDefaults.CHANNEL_POOL_ATTRIBUTE_KEY).get();
channelPool.release(channelFuture.channel()); channelPool.release(channelFuture.channel());
} }
} }
@ -364,7 +365,7 @@ public final class HttpClient implements Closeable {
}); });
} }
boolean tryRedirect(Channel channel, FullHttpResponse httpResponse, HttpRequestContext httpRequestContext) public boolean tryRedirect(Channel channel, FullHttpResponse httpResponse, HttpRequestContext httpRequestContext)
throws IOException { throws IOException {
if (httpRequestContext.isFollowRedirect()) { if (httpRequestContext.isFollowRedirect()) {
String redirUrl = findRedirect(httpRequestContext, httpResponse); String redirUrl = findRedirect(httpRequestContext, httpResponse);
@ -376,7 +377,7 @@ public final class HttpClient implements Closeable {
} else { } else {
httpRequestContext.fail("too many redirections"); httpRequestContext.fail("too many redirections");
final ChannelPool channelPool = final ChannelPool channelPool =
channel.attr(HttpClientChannelContext.CHANNEL_POOL_ATTRIBUTE_KEY).get(); channel.attr(HttpClientChannelContextDefaults.CHANNEL_POOL_ATTRIBUTE_KEY).get();
channelPool.release(channel); channelPool.release(channel);
} }
return true; return true;

View file

@ -28,6 +28,7 @@ import io.netty.handler.proxy.Socks4ProxyHandler;
import io.netty.handler.proxy.Socks5ProxyHandler; import io.netty.handler.proxy.Socks5ProxyHandler;
import io.netty.handler.ssl.CipherSuiteFilter; import io.netty.handler.ssl.CipherSuiteFilter;
import io.netty.handler.ssl.SslProvider; import io.netty.handler.ssl.SslProvider;
import org.xbib.netty.http.client.internal.HttpClientThreadFactory;
import org.xbib.netty.http.client.util.ClientAuthMode; import org.xbib.netty.http.client.util.ClientAuthMode;
import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.TrustManagerFactory;

View file

@ -15,18 +15,11 @@
*/ */
package org.xbib.netty.http.client; package org.xbib.netty.http.client;
import io.netty.channel.pool.ChannelPool;
import io.netty.handler.proxy.HttpProxyHandler; import io.netty.handler.proxy.HttpProxyHandler;
import io.netty.handler.proxy.Socks4ProxyHandler; import io.netty.handler.proxy.Socks4ProxyHandler;
import io.netty.handler.proxy.Socks5ProxyHandler; import io.netty.handler.proxy.Socks5ProxyHandler;
import io.netty.handler.ssl.CipherSuiteFilter; import io.netty.handler.ssl.CipherSuiteFilter;
import io.netty.handler.ssl.SslProvider; import io.netty.handler.ssl.SslProvider;
import io.netty.util.AttributeKey;
import org.xbib.netty.http.client.listener.CookieListener;
import org.xbib.netty.http.client.listener.ExceptionListener;
import org.xbib.netty.http.client.listener.HttpHeadersListener;
import org.xbib.netty.http.client.listener.HttpPushListener;
import org.xbib.netty.http.client.listener.HttpResponseListener;
import org.xbib.netty.http.client.util.ClientAuthMode; import org.xbib.netty.http.client.util.ClientAuthMode;
import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.TrustManagerFactory;
@ -34,28 +27,7 @@ import java.io.InputStream;
/** /**
*/ */
final class HttpClientChannelContext { public final class HttpClientChannelContext {
static final AttributeKey<ChannelPool> CHANNEL_POOL_ATTRIBUTE_KEY =
AttributeKey.valueOf("httpClientChannelPool");
static final AttributeKey<HttpRequestContext> REQUEST_CONTEXT_ATTRIBUTE_KEY =
AttributeKey.valueOf("httpClientRequestContext");
static final AttributeKey<HttpResponseListener> RESPONSE_LISTENER_ATTRIBUTE_KEY =
AttributeKey.valueOf("httpClientResponseListener");
static final AttributeKey<HttpHeadersListener> HEADER_LISTENER_ATTRIBUTE_KEY =
AttributeKey.valueOf("httpHeaderListener");
static final AttributeKey<CookieListener> COOKIE_LISTENER_ATTRIBUTE_KEY =
AttributeKey.valueOf("cookieListener");
static final AttributeKey<HttpPushListener> PUSH_LISTENER_ATTRIBUTE_KEY =
AttributeKey.valueOf("pushListener");
static final AttributeKey<ExceptionListener> EXCEPTION_LISTENER_ATTRIBUTE_KEY =
AttributeKey.valueOf("httpClientExceptionListener");
private final int maxInitialLineLength; private final int maxInitialLineLength;
@ -139,83 +111,83 @@ final class HttpClientChannelContext {
this.socks5ProxyHandler = socks5ProxyHandler; this.socks5ProxyHandler = socks5ProxyHandler;
} }
int getMaxInitialLineLength() { public int getMaxInitialLineLength() {
return maxInitialLineLength; return maxInitialLineLength;
} }
int getMaxHeaderSize() { public int getMaxHeaderSize() {
return maxHeaderSize; return maxHeaderSize;
} }
int getMaxChunkSize() { public int getMaxChunkSize() {
return maxChunkSize; return maxChunkSize;
} }
int getMaxContentLength() { public int getMaxContentLength() {
return maxContentLength; return maxContentLength;
} }
int getMaxCompositeBufferComponents() { public int getMaxCompositeBufferComponents() {
return maxCompositeBufferComponents; return maxCompositeBufferComponents;
} }
int getReadTimeoutMillis() { public int getReadTimeoutMillis() {
return readTimeoutMillis; return readTimeoutMillis;
} }
boolean isGzipEnabled() { public boolean isGzipEnabled() {
return enableGzip; return enableGzip;
} }
boolean isInstallHttp2Upgrade() { public boolean isInstallHttp2Upgrade() {
return installHttp2Upgrade; return installHttp2Upgrade;
} }
SslProvider getSslProvider() { public SslProvider getSslProvider() {
return sslProvider; return sslProvider;
} }
Iterable<String> getCiphers() { public Iterable<String> getCiphers() {
return ciphers; return ciphers;
} }
CipherSuiteFilter getCipherSuiteFilter() { public CipherSuiteFilter getCipherSuiteFilter() {
return cipherSuiteFilter; return cipherSuiteFilter;
} }
TrustManagerFactory getTrustManagerFactory() { public TrustManagerFactory getTrustManagerFactory() {
return trustManagerFactory; return trustManagerFactory;
} }
InputStream getKeyCertChainInputStream() { public InputStream getKeyCertChainInputStream() {
return keyCertChainInputStream; return keyCertChainInputStream;
} }
InputStream getKeyInputStream() { public InputStream getKeyInputStream() {
return keyInputStream; return keyInputStream;
} }
String getKeyPassword() { public String getKeyPassword() {
return keyPassword; return keyPassword;
} }
boolean isUseServerNameIdentification() { public boolean isUseServerNameIdentification() {
return useServerNameIdentification; return useServerNameIdentification;
} }
ClientAuthMode getClientAuthMode() { public ClientAuthMode getClientAuthMode() {
return clientAuthMode; return clientAuthMode;
} }
HttpProxyHandler getHttpProxyHandler() { public HttpProxyHandler getHttpProxyHandler() {
return httpProxyHandler; return httpProxyHandler;
} }
Socks4ProxyHandler getSocks4ProxyHandler() { public Socks4ProxyHandler getSocks4ProxyHandler() {
return socks4ProxyHandler; return socks4ProxyHandler;
} }
Socks5ProxyHandler getSocks5ProxyHandler() { public Socks5ProxyHandler getSocks5ProxyHandler() {
return socks5ProxyHandler; return socks5ProxyHandler;
} }
} }

View file

@ -15,12 +15,19 @@
*/ */
package org.xbib.netty.http.client; package org.xbib.netty.http.client;
import io.netty.channel.pool.ChannelPool;
import io.netty.handler.codec.http2.Http2SecurityUtil; import io.netty.handler.codec.http2.Http2SecurityUtil;
import io.netty.handler.ssl.CipherSuiteFilter; import io.netty.handler.ssl.CipherSuiteFilter;
import io.netty.handler.ssl.OpenSsl; import io.netty.handler.ssl.OpenSsl;
import io.netty.handler.ssl.SslProvider; import io.netty.handler.ssl.SslProvider;
import io.netty.handler.ssl.SupportedCipherSuiteFilter; import io.netty.handler.ssl.SupportedCipherSuiteFilter;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory; import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.util.AttributeKey;
import org.xbib.netty.http.client.listener.CookieListener;
import org.xbib.netty.http.client.listener.ExceptionListener;
import org.xbib.netty.http.client.listener.HttpHeadersListener;
import org.xbib.netty.http.client.listener.HttpPushListener;
import org.xbib.netty.http.client.listener.HttpResponseListener;
import org.xbib.netty.http.client.util.ClientAuthMode; import org.xbib.netty.http.client.util.ClientAuthMode;
import org.xbib.netty.http.client.util.InetAddressKey; import org.xbib.netty.http.client.util.InetAddressKey;
@ -30,6 +37,27 @@ import javax.net.ssl.TrustManagerFactory;
*/ */
public interface HttpClientChannelContextDefaults { public interface HttpClientChannelContextDefaults {
AttributeKey<ChannelPool> CHANNEL_POOL_ATTRIBUTE_KEY =
AttributeKey.valueOf("httpClientChannelPool");
AttributeKey<HttpRequestContext> REQUEST_CONTEXT_ATTRIBUTE_KEY =
AttributeKey.valueOf("httpClientRequestContext");
AttributeKey<HttpResponseListener> RESPONSE_LISTENER_ATTRIBUTE_KEY =
AttributeKey.valueOf("httpClientResponseListener");
AttributeKey<HttpHeadersListener> HEADER_LISTENER_ATTRIBUTE_KEY =
AttributeKey.valueOf("httpHeaderListener");
AttributeKey<CookieListener> COOKIE_LISTENER_ATTRIBUTE_KEY =
AttributeKey.valueOf("cookieListener");
AttributeKey<HttpPushListener> PUSH_LISTENER_ATTRIBUTE_KEY =
AttributeKey.valueOf("pushListener");
AttributeKey<ExceptionListener> EXCEPTION_LISTENER_ATTRIBUTE_KEY =
AttributeKey.valueOf("httpClientExceptionListener");
/** /**
* Default for TCP_NODELAY. * Default for TCP_NODELAY.
*/ */

View file

@ -1,3 +1,18 @@
/*
* Copyright 2017 Jörg Prante
*
* Jörg Prante licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.xbib.netty.http.client; package org.xbib.netty.http.client;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
@ -99,7 +114,7 @@ public class HttpClientRequestBuilder implements HttpRequestBuilder, HttpRequest
private HttpPushListener httpPushListener; private HttpPushListener httpPushListener;
HttpClientRequestBuilder(HttpMethod httpMethod, protected HttpClientRequestBuilder(HttpMethod httpMethod,
ByteBufAllocator byteBufAllocator, int streamId) { ByteBufAllocator byteBufAllocator, int streamId) {
this(null, httpMethod, byteBufAllocator, streamId); this(null, httpMethod, byteBufAllocator, streamId);
} }

View file

@ -16,6 +16,7 @@
package org.xbib.netty.http.client; package org.xbib.netty.http.client;
import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.HttpVersion;
import org.xbib.netty.http.client.internal.HttpClientUserAgent;
import java.net.URI; import java.net.URI;

View file

@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
package org.xbib.netty.http.client; package org.xbib.netty.http.client.handler;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufAllocator;
@ -37,6 +37,8 @@ import io.netty.handler.codec.http2.Http2LocalFlowController;
import io.netty.handler.codec.http2.Http2Settings; import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.codec.http2.Http2Stream; import io.netty.handler.codec.http2.Http2Stream;
import io.netty.handler.codec.http2.HttpConversionUtil; import io.netty.handler.codec.http2.HttpConversionUtil;
import org.xbib.netty.http.client.HttpClientChannelContextDefaults;
import org.xbib.netty.http.client.HttpRequestContext;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -67,7 +69,7 @@ public class Http2EventHandler extends Http2EventAdapter {
* @param maxContentLength the maximum content length * @param maxContentLength the maximum content length
* @param validateHeaders true if headers should be validated * @param validateHeaders true if headers should be validated
*/ */
Http2EventHandler(Http2Connection connection, int maxContentLength, boolean validateHeaders) { public Http2EventHandler(Http2Connection connection, int maxContentLength, boolean validateHeaders) {
this.connection = connection; this.connection = connection;
this.maxContentLength = maxContentLength; this.maxContentLength = maxContentLength;
this.validateHttpHeaders = validateHeaders; this.validateHttpHeaders = validateHeaders;
@ -87,7 +89,7 @@ public class Http2EventHandler extends Http2EventAdapter {
logger.log(Level.FINEST, () -> "settings received " + settings); logger.log(Level.FINEST, () -> "settings received " + settings);
Channel channel = ctx.channel(); Channel channel = ctx.channel();
final HttpRequestContext httpRequestContext = final HttpRequestContext httpRequestContext =
channel.attr(HttpClientChannelContext.REQUEST_CONTEXT_ATTRIBUTE_KEY).get(); channel.attr(HttpClientChannelContextDefaults.REQUEST_CONTEXT_ATTRIBUTE_KEY).get();
final HttpRequest httpRequest = httpRequestContext.getHttpRequest(); final HttpRequest httpRequest = httpRequestContext.getHttpRequest();
ChannelPromise channelPromise = channel.newPromise(); ChannelPromise channelPromise = channel.newPromise();
Http2Headers headers = toHttp2Headers(httpRequestContext); Http2Headers headers = toHttp2Headers(httpRequestContext);
@ -242,7 +244,7 @@ public class Http2EventHandler extends Http2EventAdapter {
removeMessage(stream, true); removeMessage(stream, true);
} }
final HttpRequestContext httpRequestContext = final HttpRequestContext httpRequestContext =
ctx.channel().attr(HttpClientChannelContext.REQUEST_CONTEXT_ATTRIBUTE_KEY).get(); ctx.channel().attr(HttpClientChannelContextDefaults.REQUEST_CONTEXT_ATTRIBUTE_KEY).get();
httpRequestContext.getPushMap().remove(streamId); httpRequestContext.getPushMap().remove(streamId);
} }
@ -285,7 +287,7 @@ public class Http2EventHandler extends Http2EventAdapter {
} }
Channel channel = ctx.channel(); Channel channel = ctx.channel();
final HttpRequestContext httpRequestContext = final HttpRequestContext httpRequestContext =
channel.attr(HttpClientChannelContext.REQUEST_CONTEXT_ATTRIBUTE_KEY).get(); channel.attr(HttpClientChannelContextDefaults.REQUEST_CONTEXT_ATTRIBUTE_KEY).get();
httpRequestContext.receiveStreamID(promisedStreamId, headers, channel.newPromise()); httpRequestContext.receiveStreamID(promisedStreamId, headers, channel.newPromise());
} }

View file

@ -0,0 +1,65 @@
/*
* Copyright 2017 Jörg Prante
*
* Jörg Prante licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.xbib.netty.http.client.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandler;
import io.netty.handler.ssl.ApplicationProtocolNames;
import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler;
import java.util.logging.Level;
import java.util.logging.Logger;
import static org.xbib.netty.http.client.handler.HttpClientChannelInitializer.configureHttp1Pipeline;
import static org.xbib.netty.http.client.handler.HttpClientChannelInitializer.configureHttp2Pipeline;
import static org.xbib.netty.http.client.handler.HttpClientChannelInitializer.createHttp1ConnectionHandler;
import static org.xbib.netty.http.client.handler.HttpClientChannelInitializer.createHttp2ConnectionHandler;
/**
*
*/
class Http2NegotiationHandler extends ApplicationProtocolNegotiationHandler {
private static final Logger logger = Logger.getLogger(Http2NegotiationHandler.class.getName());
private final HttpClientChannelInitializer initializer;
Http2NegotiationHandler(String fallbackProtocol, HttpClientChannelInitializer initializer) {
super(fallbackProtocol);
this.initializer = initializer;
}
@Override
protected void configurePipeline(ChannelHandlerContext ctx, String protocol) throws Exception {
if (ApplicationProtocolNames.HTTP_2.equals(protocol)) {
HttpToHttp2ConnectionHandler http2connectionHandler = createHttp2ConnectionHandler(initializer.getContext());
ctx.pipeline().addLast(http2connectionHandler);
configureHttp2Pipeline(ctx.pipeline(), initializer.getHttp2ResponseHandler());
logger.log(Level.FINE, () -> "negotiated HTTP/2: handler = " + ctx.pipeline().names());
return;
}
if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) {
HttpClientCodec http1connectionHandler = createHttp1ConnectionHandler(initializer.getContext());
ctx.pipeline().addLast(http1connectionHandler);
configureHttp1Pipeline(ctx.pipeline(), initializer.getContext(), initializer.getHttpHandler());
logger.log(Level.FINE, () -> "negotiated HTTP/1.1: handler = " + ctx.pipeline().names());
return;
}
ctx.close();
throw new IllegalStateException("unexpected protocol: " + protocol);
}
}

View file

@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
package org.xbib.netty.http.client; package org.xbib.netty.http.client.handler;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
@ -28,6 +28,9 @@ import io.netty.handler.codec.http.cookie.ClientCookieDecoder;
import io.netty.handler.codec.http.cookie.Cookie; import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.codec.http2.Http2Headers; import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.HttpConversionUtil; import io.netty.handler.codec.http2.HttpConversionUtil;
import org.xbib.netty.http.client.HttpClient;
import org.xbib.netty.http.client.HttpClientChannelContextDefaults;
import org.xbib.netty.http.client.HttpRequestContext;
import org.xbib.netty.http.client.listener.CookieListener; import org.xbib.netty.http.client.listener.CookieListener;
import org.xbib.netty.http.client.listener.ExceptionListener; import org.xbib.netty.http.client.listener.ExceptionListener;
import org.xbib.netty.http.client.listener.HttpHeadersListener; import org.xbib.netty.http.client.listener.HttpHeadersListener;
@ -48,7 +51,7 @@ public class Http2ResponseHandler extends SimpleChannelInboundHandler<FullHttpRe
private final HttpClient httpClient; private final HttpClient httpClient;
Http2ResponseHandler(HttpClient httpClient) { public Http2ResponseHandler(HttpClient httpClient) {
this.httpClient = httpClient; this.httpClient = httpClient;
} }
@ -61,16 +64,16 @@ public class Http2ResponseHandler extends SimpleChannelInboundHandler<FullHttpRe
return; return;
} }
final HttpRequestContext httpRequestContext = final HttpRequestContext httpRequestContext =
ctx.channel().attr(HttpClientChannelContext.REQUEST_CONTEXT_ATTRIBUTE_KEY).get(); ctx.channel().attr(HttpClientChannelContextDefaults.REQUEST_CONTEXT_ATTRIBUTE_KEY).get();
HttpHeaders httpHeaders = httpResponse.headers(); HttpHeaders httpHeaders = httpResponse.headers();
HttpHeadersListener httpHeadersListener = HttpHeadersListener httpHeadersListener =
ctx.channel().attr(HttpClientChannelContext.HEADER_LISTENER_ATTRIBUTE_KEY).get(); ctx.channel().attr(HttpClientChannelContextDefaults.HEADER_LISTENER_ATTRIBUTE_KEY).get();
if (httpHeadersListener != null) { if (httpHeadersListener != null) {
logger.log(Level.FINE, () -> "firing onHeaders"); logger.log(Level.FINE, () -> "firing onHeaders");
httpHeadersListener.onHeaders(httpHeaders); httpHeadersListener.onHeaders(httpHeaders);
} }
CookieListener cookieListener = CookieListener cookieListener =
ctx.channel().attr(HttpClientChannelContext.COOKIE_LISTENER_ATTRIBUTE_KEY).get(); ctx.channel().attr(HttpClientChannelContextDefaults.COOKIE_LISTENER_ATTRIBUTE_KEY).get();
for (String cookieString : httpHeaders.getAll(HttpHeaderNames.SET_COOKIE)) { for (String cookieString : httpHeaders.getAll(HttpHeaderNames.SET_COOKIE)) {
Cookie cookie = ClientCookieDecoder.STRICT.decode(cookieString); Cookie cookie = ClientCookieDecoder.STRICT.decode(cookieString);
httpRequestContext.addCookie(cookie); httpRequestContext.addCookie(cookie);
@ -82,7 +85,7 @@ public class Http2ResponseHandler extends SimpleChannelInboundHandler<FullHttpRe
Entry<Http2Headers, ChannelPromise> pushEntry = httpRequestContext.getPushMap().get(streamId); Entry<Http2Headers, ChannelPromise> pushEntry = httpRequestContext.getPushMap().get(streamId);
if (pushEntry != null) { if (pushEntry != null) {
final HttpPushListener httpPushListener = final HttpPushListener httpPushListener =
ctx.channel().attr(HttpClientChannelContext.PUSH_LISTENER_ATTRIBUTE_KEY).get(); ctx.channel().attr(HttpClientChannelContextDefaults.PUSH_LISTENER_ATTRIBUTE_KEY).get();
if (httpPushListener != null) { if (httpPushListener != null) {
httpPushListener.onPushReceived(pushEntry.getKey(), httpResponse); httpPushListener.onPushReceived(pushEntry.getKey(), httpResponse);
} }
@ -98,7 +101,7 @@ public class Http2ResponseHandler extends SimpleChannelInboundHandler<FullHttpRe
Entry<ChannelFuture, ChannelPromise> promiseEntry = httpRequestContext.getStreamIdPromiseMap().get(streamId); Entry<ChannelFuture, ChannelPromise> promiseEntry = httpRequestContext.getStreamIdPromiseMap().get(streamId);
if (promiseEntry != null) { if (promiseEntry != null) {
final HttpResponseListener httpResponseListener = final HttpResponseListener httpResponseListener =
ctx.channel().attr(HttpClientChannelContext.RESPONSE_LISTENER_ATTRIBUTE_KEY).get(); ctx.channel().attr(HttpClientChannelContextDefaults.RESPONSE_LISTENER_ATTRIBUTE_KEY).get();
if (httpResponseListener != null) { if (httpResponseListener != null) {
httpResponseListener.onResponse(httpResponse); httpResponseListener.onResponse(httpResponse);
} }
@ -124,7 +127,7 @@ public class Http2ResponseHandler extends SimpleChannelInboundHandler<FullHttpRe
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.log(Level.FINE, ctx::toString); logger.log(Level.FINE, ctx::toString);
final ChannelPool channelPool = final ChannelPool channelPool =
ctx.channel().attr(HttpClientChannelContext.CHANNEL_POOL_ATTRIBUTE_KEY).get(); ctx.channel().attr(HttpClientChannelContextDefaults.CHANNEL_POOL_ATTRIBUTE_KEY).get();
channelPool.release(ctx.channel()); channelPool.release(ctx.channel());
} }
@ -132,15 +135,15 @@ public class Http2ResponseHandler extends SimpleChannelInboundHandler<FullHttpRe
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.log(Level.FINE, () -> "exception caught: " + cause); logger.log(Level.FINE, () -> "exception caught: " + cause);
ExceptionListener exceptionListener = ExceptionListener exceptionListener =
ctx.channel().attr(HttpClientChannelContext.EXCEPTION_LISTENER_ATTRIBUTE_KEY).get(); ctx.channel().attr(HttpClientChannelContextDefaults.EXCEPTION_LISTENER_ATTRIBUTE_KEY).get();
if (exceptionListener != null) { if (exceptionListener != null) {
exceptionListener.onException(cause); exceptionListener.onException(cause);
} }
final HttpRequestContext httpRequestContext = final HttpRequestContext httpRequestContext =
ctx.channel().attr(HttpClientChannelContext.REQUEST_CONTEXT_ATTRIBUTE_KEY).get(); ctx.channel().attr(HttpClientChannelContextDefaults.REQUEST_CONTEXT_ATTRIBUTE_KEY).get();
httpRequestContext.fail(cause.getMessage()); httpRequestContext.fail(cause.getMessage());
final ChannelPool channelPool = final ChannelPool channelPool =
ctx.channel().attr(HttpClientChannelContext.CHANNEL_POOL_ATTRIBUTE_KEY).get(); ctx.channel().attr(HttpClientChannelContextDefaults.CHANNEL_POOL_ATTRIBUTE_KEY).get();
channelPool.release(ctx.channel()); channelPool.release(ctx.channel());
} }
} }

View file

@ -13,41 +13,29 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
package org.xbib.netty.http.client; package org.xbib.netty.http.client.handler;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.ChannelInputShutdownReadComplete;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.HttpClientCodec; import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpClientUpgradeHandler; import io.netty.handler.codec.http.HttpClientUpgradeHandler;
import io.netty.handler.codec.http.HttpContentDecompressor; import io.netty.handler.codec.http.HttpContentDecompressor;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http2.DefaultHttp2Connection; import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DelegatingDecompressorFrameListener; import io.netty.handler.codec.http2.DelegatingDecompressorFrameListener;
import io.netty.handler.codec.http2.Http2ClientUpgradeCodec; import io.netty.handler.codec.http2.Http2ClientUpgradeCodec;
import io.netty.handler.codec.http2.Http2Connection; import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2ConnectionPrefaceWrittenEvent;
import io.netty.handler.codec.http2.Http2FrameLogger; import io.netty.handler.codec.http2.Http2FrameLogger;
import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandler; import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandler;
import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandlerBuilder; import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandlerBuilder;
import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.ApplicationProtocolConfig; import io.netty.handler.ssl.ApplicationProtocolConfig;
import io.netty.handler.ssl.ApplicationProtocolNames; import io.netty.handler.ssl.ApplicationProtocolNames;
import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler;
import io.netty.handler.ssl.SslCloseCompletionEvent;
import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.handler.timeout.ReadTimeoutHandler;
import org.xbib.netty.http.client.listener.ExceptionListener; import org.xbib.netty.http.client.HttpClientChannelContext;
import org.xbib.netty.http.client.util.InetAddressKey; import org.xbib.netty.http.client.util.InetAddressKey;
import javax.net.ssl.SNIHostName; import javax.net.ssl.SNIHostName;
@ -63,7 +51,7 @@ import java.util.logging.Logger;
/** /**
* Netty HTTP client channel initializer. * Netty HTTP client channel initializer.
*/ */
class HttpClientChannelInitializer extends ChannelInitializer<SocketChannel> { public class HttpClientChannelInitializer extends ChannelInitializer<SocketChannel> {
private static final Logger logger = Logger.getLogger(HttpClientChannelInitializer.class.getName()); private static final Logger logger = Logger.getLogger(HttpClientChannelInitializer.class.getName());
@ -81,13 +69,25 @@ class HttpClientChannelInitializer extends ChannelInitializer<SocketChannel> {
* @param httpHandler the HTTP 1.x handler * @param httpHandler the HTTP 1.x handler
* @param http2ResponseHandler the HTTP 2 handler * @param http2ResponseHandler the HTTP 2 handler
*/ */
HttpClientChannelInitializer(HttpClientChannelContext context, HttpHandler httpHandler, public HttpClientChannelInitializer(HttpClientChannelContext context, HttpHandler httpHandler,
Http2ResponseHandler http2ResponseHandler) { Http2ResponseHandler http2ResponseHandler) {
this.context = context; this.context = context;
this.httpHandler = httpHandler; this.httpHandler = httpHandler;
this.http2ResponseHandler = http2ResponseHandler; this.http2ResponseHandler = http2ResponseHandler;
} }
HttpClientChannelContext getContext() {
return context;
}
HttpHandler getHttpHandler() {
return httpHandler;
}
Http2ResponseHandler getHttp2ResponseHandler() {
return http2ResponseHandler;
}
/** /**
* Sets up a {@link InetAddressKey} for the channel initialization and initializes the channel. * Sets up a {@link InetAddressKey} for the channel initialization and initializes the channel.
* Using this method, the channel initializer can handle secure channels, the HTTP protocol version, * Using this method, the channel initializer can handle secure channels, the HTTP protocol version,
@ -96,7 +96,7 @@ class HttpClientChannelInitializer extends ChannelInitializer<SocketChannel> {
* @param key the key of the internet address * @param key the key of the internet address
* @throws Exception if channel * @throws Exception if channel
*/ */
void initChannel(SocketChannel ch, InetAddressKey key) throws Exception { public void initChannel(SocketChannel ch, InetAddressKey key) throws Exception {
this.key = key; this.key = key;
initChannel(ch); initChannel(ch);
} }
@ -130,14 +130,14 @@ class HttpClientChannelInitializer extends ChannelInitializer<SocketChannel> {
private void configureClearText(SocketChannel ch) { private void configureClearText(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline(); ChannelPipeline pipeline = ch.pipeline();
if (key.getVersion().majorVersion() == 1) { if (key.getVersion().majorVersion() == 1) {
HttpClientCodec http1connectionHandler = createHttp1ConnectionHandler(); HttpClientCodec http1connectionHandler = createHttp1ConnectionHandler(context);
pipeline.addLast(http1connectionHandler); pipeline.addLast(http1connectionHandler);
configureHttp1Pipeline(pipeline); configureHttp1Pipeline(pipeline, context, httpHandler);
} else if (key.getVersion().majorVersion() == 2) { } else if (key.getVersion().majorVersion() == 2) {
HttpToHttp2ConnectionHandler http2connectionHandler = createHttp2ConnectionHandler(); HttpToHttp2ConnectionHandler http2connectionHandler = createHttp2ConnectionHandler(context);
// using the upgrade handler means mixed HTTP 1 and HTTP 2 on the same connection // using the upgrade handler means mixed HTTP 1 and HTTP 2 on the same connection
if (context.isInstallHttp2Upgrade()) { if (context.isInstallHttp2Upgrade()) {
HttpClientCodec http1connectionHandler = createHttp1ConnectionHandler(); HttpClientCodec http1connectionHandler = createHttp1ConnectionHandler(context);
Http2ClientUpgradeCodec upgradeCodec = Http2ClientUpgradeCodec upgradeCodec =
new Http2ClientUpgradeCodec(http2connectionHandler); new Http2ClientUpgradeCodec(http2connectionHandler);
HttpClientUpgradeHandler upgradeHandler = HttpClientUpgradeHandler upgradeHandler =
@ -149,8 +149,8 @@ class HttpClientChannelInitializer extends ChannelInitializer<SocketChannel> {
} else { } else {
pipeline.addLast(http2connectionHandler); pipeline.addLast(http2connectionHandler);
} }
configureHttp2Pipeline(pipeline); configureHttp2Pipeline(pipeline, http2ResponseHandler);
configureHttp1Pipeline(pipeline); configureHttp1Pipeline(pipeline, context, httpHandler);
} }
} }
@ -192,15 +192,15 @@ class HttpClientChannelInitializer extends ChannelInitializer<SocketChannel> {
break; break;
} }
if (key.getVersion().majorVersion() == 1) { if (key.getVersion().majorVersion() == 1) {
HttpClientCodec http1connectionHandler = createHttp1ConnectionHandler(); HttpClientCodec http1connectionHandler = createHttp1ConnectionHandler(context);
pipeline.addLast(http1connectionHandler); pipeline.addLast(http1connectionHandler);
configureHttp1Pipeline(pipeline); configureHttp1Pipeline(pipeline, context, httpHandler);
} else if (key.getVersion().majorVersion() == 2) { } else if (key.getVersion().majorVersion() == 2) {
pipeline.addLast(new Http2NegotiationHandler(ApplicationProtocolNames.HTTP_1_1)); pipeline.addLast(new Http2NegotiationHandler(ApplicationProtocolNames.HTTP_1_1, this));
} }
} }
private void configureHttp1Pipeline(ChannelPipeline pipeline) { static void configureHttp1Pipeline(ChannelPipeline pipeline, HttpClientChannelContext context, HttpHandler httpHandler) {
if (context.isGzipEnabled()) { if (context.isGzipEnabled()) {
pipeline.addLast(new HttpContentDecompressor()); pipeline.addLast(new HttpContentDecompressor());
} }
@ -211,16 +211,16 @@ class HttpClientChannelInitializer extends ChannelInitializer<SocketChannel> {
pipeline.addLast(httpHandler); pipeline.addLast(httpHandler);
} }
private void configureHttp2Pipeline(ChannelPipeline pipeline) { static void configureHttp2Pipeline(ChannelPipeline pipeline, Http2ResponseHandler http2ResponseHandler) {
pipeline.addLast(new UserEventLogger()); pipeline.addLast(new UserEventLogger());
pipeline.addLast(http2ResponseHandler); pipeline.addLast(http2ResponseHandler);
} }
private HttpClientCodec createHttp1ConnectionHandler() { static HttpClientCodec createHttp1ConnectionHandler(HttpClientChannelContext context) {
return new HttpClientCodec(context.getMaxInitialLineLength(), context.getMaxHeaderSize(), context.getMaxChunkSize()); return new HttpClientCodec(context.getMaxInitialLineLength(), context.getMaxHeaderSize(), context.getMaxChunkSize());
} }
private HttpToHttp2ConnectionHandler createHttp2ConnectionHandler() { static HttpToHttp2ConnectionHandler createHttp2ConnectionHandler(HttpClientChannelContext context) {
final Http2Connection http2Connection = new DefaultHttp2Connection(false); final Http2Connection http2Connection = new DefaultHttp2Connection(false);
return new HttpToHttp2ConnectionHandlerBuilder() return new HttpToHttp2ConnectionHandlerBuilder()
.connection(http2Connection) .connection(http2Connection)
@ -229,124 +229,4 @@ class HttpClientChannelInitializer extends ChannelInitializer<SocketChannel> {
new Http2EventHandler(http2Connection, context.getMaxContentLength(), false))) new Http2EventHandler(http2Connection, context.getMaxContentLength(), false)))
.build(); .build();
} }
private class Http2NegotiationHandler extends ApplicationProtocolNegotiationHandler {
Http2NegotiationHandler(String fallbackProtocol) {
super(fallbackProtocol);
}
@Override
protected void configurePipeline(ChannelHandlerContext ctx, String protocol) throws Exception {
if (ApplicationProtocolNames.HTTP_2.equals(protocol)) {
HttpToHttp2ConnectionHandler http2connectionHandler = createHttp2ConnectionHandler();
ctx.pipeline().addLast(http2connectionHandler);
configureHttp2Pipeline(ctx.pipeline());
logger.log(Level.FINE, () -> "negotiated HTTP/2: handler = " + ctx.pipeline().names());
return;
}
if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) {
HttpClientCodec http1connectionHandler = createHttp1ConnectionHandler();
ctx.pipeline().addLast(http1connectionHandler);
configureHttp1Pipeline(ctx.pipeline());
logger.log(Level.FINE, () -> "negotiated HTTP/1.1: handler = " + ctx.pipeline().names());
return;
}
// close and fail
ctx.close();
throw new IllegalStateException("unexpected protocol: " + protocol);
}
}
@Sharable
private static class UpgradeRequestHandler extends ChannelInboundHandlerAdapter {
/**
* Send an upgrade request if channel becomes active.
* @param ctx the channel handler context
* @throws Exception if upgrade request sending fails
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
DefaultFullHttpRequest upgradeRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
ctx.writeAndFlush(upgradeRequest);
super.channelActive(ctx);
ctx.pipeline().remove(this);
logger.log(Level.FINE, () -> "upgrade request handler removed, pipeline = " + ctx.pipeline().names());
}
/**
* Forward channel exceptions to the exception listener.
* @param ctx the channel handler context
* @param cause the cause of the exception
* @throws Exception if forwarding fails
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.log(Level.FINE, () -> "exceptionCaught " + cause.getMessage());
ExceptionListener exceptionListener =
ctx.channel().attr(HttpClientChannelContext.EXCEPTION_LISTENER_ATTRIBUTE_KEY).get();
if (exceptionListener != null) {
exceptionListener.onException(cause);
}
final HttpRequestContext httpRequestContext =
ctx.channel().attr(HttpClientChannelContext.REQUEST_CONTEXT_ATTRIBUTE_KEY).get();
httpRequestContext.fail(cause.getMessage());
}
}
/**
* A Netty handler that logs user events and find expetced ones.
*/
@Sharable
private static class UserEventLogger extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
logger.log(Level.FINE, () -> "got user event " + evt);
if (evt instanceof Http2ConnectionPrefaceWrittenEvent ||
evt instanceof SslCloseCompletionEvent ||
evt instanceof ChannelInputShutdownReadComplete) {
// log expected events
logger.log(Level.FINE, () -> "user event is expected: " + evt);
return;
}
super.userEventTriggered(ctx, evt);
}
}
/**
* A Netty handler that logs the I/O traffic of a connection.
*/
@Sharable
private static class TrafficLoggingHandler extends LoggingHandler {
TrafficLoggingHandler() {
super("client", LogLevel.TRACE);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof ByteBuf && !((ByteBuf) msg).isReadable()) {
ctx.write(msg, promise);
} else {
super.write(ctx, msg, promise);
}
}
}
} }

View file

@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
package org.xbib.netty.http.client; package org.xbib.netty.http.client.handler;
import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
@ -24,6 +24,9 @@ import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.cookie.ClientCookieDecoder; import io.netty.handler.codec.http.cookie.ClientCookieDecoder;
import io.netty.handler.codec.http.cookie.Cookie; import io.netty.handler.codec.http.cookie.Cookie;
import org.xbib.netty.http.client.HttpClient;
import org.xbib.netty.http.client.HttpClientChannelContextDefaults;
import org.xbib.netty.http.client.HttpRequestContext;
import org.xbib.netty.http.client.listener.CookieListener; import org.xbib.netty.http.client.listener.CookieListener;
import org.xbib.netty.http.client.listener.ExceptionListener; import org.xbib.netty.http.client.listener.ExceptionListener;
import org.xbib.netty.http.client.listener.HttpHeadersListener; import org.xbib.netty.http.client.listener.HttpHeadersListener;
@ -36,13 +39,13 @@ import java.util.logging.Logger;
* HTTP 1.x Netty channel handler. * HTTP 1.x Netty channel handler.
*/ */
@ChannelHandler.Sharable @ChannelHandler.Sharable
final class HttpHandler extends ChannelInboundHandlerAdapter { public final class HttpHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = Logger.getLogger(HttpHandler.class.getName()); private static final Logger logger = Logger.getLogger(HttpHandler.class.getName());
private final HttpClient httpClient; private final HttpClient httpClient;
HttpHandler(HttpClient httpClient) { public HttpHandler(HttpClient httpClient) {
this.httpClient = httpClient; this.httpClient = httpClient;
} }
@ -57,18 +60,18 @@ final class HttpHandler extends ChannelInboundHandlerAdapter {
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
logger.log(Level.FINE, () -> "channelRead msg " + msg.getClass().getName()); logger.log(Level.FINE, () -> "channelRead msg " + msg.getClass().getName());
final HttpRequestContext httpRequestContext = final HttpRequestContext httpRequestContext =
ctx.channel().attr(HttpClientChannelContext.REQUEST_CONTEXT_ATTRIBUTE_KEY).get(); ctx.channel().attr(HttpClientChannelContextDefaults.REQUEST_CONTEXT_ATTRIBUTE_KEY).get();
if (msg instanceof FullHttpResponse) { if (msg instanceof FullHttpResponse) {
FullHttpResponse httpResponse = (FullHttpResponse) msg; FullHttpResponse httpResponse = (FullHttpResponse) msg;
HttpHeaders httpHeaders = httpResponse.headers(); HttpHeaders httpHeaders = httpResponse.headers();
HttpHeadersListener httpHeadersListener = HttpHeadersListener httpHeadersListener =
ctx.channel().attr(HttpClientChannelContext.HEADER_LISTENER_ATTRIBUTE_KEY).get(); ctx.channel().attr(HttpClientChannelContextDefaults.HEADER_LISTENER_ATTRIBUTE_KEY).get();
if (httpHeadersListener != null) { if (httpHeadersListener != null) {
logger.log(Level.FINE, () -> "firing onHeaders"); logger.log(Level.FINE, () -> "firing onHeaders");
httpHeadersListener.onHeaders(httpHeaders); httpHeadersListener.onHeaders(httpHeaders);
} }
CookieListener cookieListener = CookieListener cookieListener =
ctx.channel().attr(HttpClientChannelContext.COOKIE_LISTENER_ATTRIBUTE_KEY).get(); ctx.channel().attr(HttpClientChannelContextDefaults.COOKIE_LISTENER_ATTRIBUTE_KEY).get();
for (String cookieString : httpHeaders.getAll(HttpHeaderNames.SET_COOKIE)) { for (String cookieString : httpHeaders.getAll(HttpHeaderNames.SET_COOKIE)) {
Cookie cookie = ClientCookieDecoder.STRICT.decode(cookieString); Cookie cookie = ClientCookieDecoder.STRICT.decode(cookieString);
httpRequestContext.addCookie(cookie); httpRequestContext.addCookie(cookie);
@ -78,7 +81,7 @@ final class HttpHandler extends ChannelInboundHandlerAdapter {
} }
} }
HttpResponseListener httpResponseListener = HttpResponseListener httpResponseListener =
ctx.channel().attr(HttpClientChannelContext.RESPONSE_LISTENER_ATTRIBUTE_KEY).get(); ctx.channel().attr(HttpClientChannelContextDefaults.RESPONSE_LISTENER_ATTRIBUTE_KEY).get();
if (httpResponseListener != null) { if (httpResponseListener != null) {
logger.log(Level.FINE, () -> "firing onResponse"); logger.log(Level.FINE, () -> "firing onResponse");
httpResponseListener.onResponse(httpResponse); httpResponseListener.onResponse(httpResponse);
@ -89,7 +92,7 @@ final class HttpHandler extends ChannelInboundHandlerAdapter {
} }
httpRequestContext.success("response finished"); httpRequestContext.success("response finished");
final ChannelPool channelPool = final ChannelPool channelPool =
ctx.channel().attr(HttpClientChannelContext.CHANNEL_POOL_ATTRIBUTE_KEY).get(); ctx.channel().attr(HttpClientChannelContextDefaults.CHANNEL_POOL_ATTRIBUTE_KEY).get();
channelPool.release(ctx.channel()); channelPool.release(ctx.channel());
} }
} }
@ -98,12 +101,12 @@ final class HttpHandler extends ChannelInboundHandlerAdapter {
public void channelInactive(ChannelHandlerContext ctx) throws Exception { public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.log(Level.FINE, () -> "channelInactive " + ctx); logger.log(Level.FINE, () -> "channelInactive " + ctx);
final HttpRequestContext httpRequestContext = final HttpRequestContext httpRequestContext =
ctx.channel().attr(HttpClientChannelContext.REQUEST_CONTEXT_ATTRIBUTE_KEY).get(); ctx.channel().attr(HttpClientChannelContextDefaults.REQUEST_CONTEXT_ATTRIBUTE_KEY).get();
if (httpRequestContext.getRedirectCount().get() == 0 && !httpRequestContext.isSucceeded()) { if (httpRequestContext.getRedirectCount().get() == 0 && !httpRequestContext.isSucceeded()) {
httpRequestContext.fail("channel inactive"); httpRequestContext.fail("channel inactive");
} }
final ChannelPool channelPool = final ChannelPool channelPool =
ctx.channel().attr(HttpClientChannelContext.CHANNEL_POOL_ATTRIBUTE_KEY).get(); ctx.channel().attr(HttpClientChannelContextDefaults.CHANNEL_POOL_ATTRIBUTE_KEY).get();
channelPool.release(ctx.channel()); channelPool.release(ctx.channel());
} }
@ -116,16 +119,16 @@ final class HttpHandler extends ChannelInboundHandlerAdapter {
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ExceptionListener exceptionListener = ExceptionListener exceptionListener =
ctx.channel().attr(HttpClientChannelContext.EXCEPTION_LISTENER_ATTRIBUTE_KEY).get(); ctx.channel().attr(HttpClientChannelContextDefaults.EXCEPTION_LISTENER_ATTRIBUTE_KEY).get();
logger.log(Level.FINE, () -> "exceptionCaught"); logger.log(Level.FINE, () -> "exceptionCaught");
if (exceptionListener != null) { if (exceptionListener != null) {
exceptionListener.onException(cause); exceptionListener.onException(cause);
} }
final HttpRequestContext httpRequestContext = final HttpRequestContext httpRequestContext =
ctx.channel().attr(HttpClientChannelContext.REQUEST_CONTEXT_ATTRIBUTE_KEY).get(); ctx.channel().attr(HttpClientChannelContextDefaults.REQUEST_CONTEXT_ATTRIBUTE_KEY).get();
httpRequestContext.fail(cause.getMessage()); httpRequestContext.fail(cause.getMessage());
final ChannelPool channelPool = final ChannelPool channelPool =
ctx.channel().attr(HttpClientChannelContext.CHANNEL_POOL_ATTRIBUTE_KEY).get(); ctx.channel().attr(HttpClientChannelContextDefaults.CHANNEL_POOL_ATTRIBUTE_KEY).get();
channelPool.release(ctx.channel()); channelPool.release(ctx.channel());
} }
} }

View file

@ -0,0 +1,58 @@
/*
* Copyright 2017 Jörg Prante
*
* Jörg Prante licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.xbib.netty.http.client.handler;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
/**
* A Netty handler that logs the I/O traffic of a connection.
*/
@ChannelHandler.Sharable
class TrafficLoggingHandler extends LoggingHandler {
TrafficLoggingHandler() {
super("client", LogLevel.TRACE);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof ByteBuf && !((ByteBuf) msg).isReadable()) {
ctx.write(msg, promise);
} else {
super.write(ctx, msg, promise);
}
}
}

View file

@ -0,0 +1,71 @@
/*
* Copyright 2017 Jörg Prante
*
* Jörg Prante licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.xbib.netty.http.client.handler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
import org.xbib.netty.http.client.HttpClientChannelContextDefaults;
import org.xbib.netty.http.client.HttpRequestContext;
import org.xbib.netty.http.client.listener.ExceptionListener;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
*
*/
@ChannelHandler.Sharable
class UpgradeRequestHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = Logger.getLogger(UpgradeRequestHandler.class.getName());
/**
* Send an upgrade request if channel becomes active.
* @param ctx the channel handler context
* @throws Exception if upgrade request sending fails
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
DefaultFullHttpRequest upgradeRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
ctx.writeAndFlush(upgradeRequest);
super.channelActive(ctx);
ctx.pipeline().remove(this);
logger.log(Level.FINE, () -> "upgrade request handler removed, pipeline = " + ctx.pipeline().names());
}
/**
* Forward channel exceptions to the exception listener.
* @param ctx the channel handler context
* @param cause the cause of the exception
* @throws Exception if forwarding fails
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.log(Level.FINE, () -> "exceptionCaught " + cause.getMessage());
ExceptionListener exceptionListener =
ctx.channel().attr(HttpClientChannelContextDefaults.EXCEPTION_LISTENER_ATTRIBUTE_KEY).get();
if (exceptionListener != null) {
exceptionListener.onException(cause);
}
final HttpRequestContext httpRequestContext =
ctx.channel().attr(HttpClientChannelContextDefaults.REQUEST_CONTEXT_ATTRIBUTE_KEY).get();
httpRequestContext.fail(cause.getMessage());
}
}

View file

@ -0,0 +1,48 @@
/*
* Copyright 2017 Jörg Prante
*
* Jörg Prante licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.xbib.netty.http.client.handler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.socket.ChannelInputShutdownReadComplete;
import io.netty.handler.codec.http2.Http2ConnectionPrefaceWrittenEvent;
import io.netty.handler.ssl.SslCloseCompletionEvent;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* A Netty handler that logs user events and find expetced ones.
*/
@ChannelHandler.Sharable
class UserEventLogger extends ChannelInboundHandlerAdapter {
private static final Logger logger = Logger.getLogger(UserEventLogger.class.getName());
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
logger.log(Level.FINE, () -> "got user event " + evt);
if (evt instanceof Http2ConnectionPrefaceWrittenEvent ||
evt instanceof SslCloseCompletionEvent ||
evt instanceof ChannelInputShutdownReadComplete) {
// log expected events
logger.log(Level.FINE, () -> "user event is expected: " + evt);
return;
}
super.userEventTriggered(ctx, evt);
}
}

View file

@ -0,0 +1,4 @@
/**
* Handlers for Netty HTTP client.
*/
package org.xbib.netty.http.client.handler;

View file

@ -13,11 +13,12 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
package org.xbib.netty.http.client; package org.xbib.netty.http.client.internal;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.pool.ChannelPoolHandler; import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import org.xbib.netty.http.client.handler.HttpClientChannelInitializer;
import org.xbib.netty.http.client.util.InetAddressKey; import org.xbib.netty.http.client.util.InetAddressKey;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -39,7 +40,7 @@ public class HttpClientChannelPoolHandler implements ChannelPoolHandler {
private int peak; private int peak;
HttpClientChannelPoolHandler(HttpClientChannelInitializer channelInitializer, InetAddressKey key) { public HttpClientChannelPoolHandler(HttpClientChannelInitializer channelInitializer, InetAddressKey key) {
this.channelInitializer = channelInitializer; this.channelInitializer = channelInitializer;
this.key = key; this.key = key;
} }

View file

@ -13,11 +13,16 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
package org.xbib.netty.http.client; package org.xbib.netty.http.client.internal;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.pool.AbstractChannelPoolMap; import io.netty.channel.pool.AbstractChannelPoolMap;
import io.netty.channel.pool.FixedChannelPool; import io.netty.channel.pool.FixedChannelPool;
import org.xbib.netty.http.client.HttpClient;
import org.xbib.netty.http.client.HttpClientChannelContext;
import org.xbib.netty.http.client.handler.Http2ResponseHandler;
import org.xbib.netty.http.client.handler.HttpClientChannelInitializer;
import org.xbib.netty.http.client.handler.HttpHandler;
import org.xbib.netty.http.client.util.InetAddressKey; import org.xbib.netty.http.client.util.InetAddressKey;
/** /**
@ -37,7 +42,7 @@ public class HttpClientChannelPoolMap extends AbstractChannelPoolMap<InetAddress
private HttpClientChannelPoolHandler httpClientChannelPoolHandler; private HttpClientChannelPoolHandler httpClientChannelPoolHandler;
HttpClientChannelPoolMap(HttpClient httpClient, public HttpClientChannelPoolMap(HttpClient httpClient,
HttpClientChannelContext httpClientChannelContext, HttpClientChannelContext httpClientChannelContext,
Bootstrap bootstrap, Bootstrap bootstrap,
int maxConnections) { int maxConnections) {

View file

@ -13,11 +13,14 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
package org.xbib.netty.http.client; package org.xbib.netty.http.client.internal;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
class HttpClientThreadFactory implements ThreadFactory { /**
*
*/
public class HttpClientThreadFactory implements ThreadFactory {
private int number = 0; private int number = 0;

View file

@ -13,9 +13,10 @@
* License for the specific language governing permissions and limitations * License for the specific language governing permissions and limitations
* under the License. * under the License.
*/ */
package org.xbib.netty.http.client; package org.xbib.netty.http.client.internal;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import org.xbib.netty.http.client.HttpClient;
import java.util.Optional; import java.util.Optional;

View file

@ -0,0 +1,4 @@
/**
* Internal classes for Netty HTTP client.
*/
package org.xbib.netty.http.client.internal;