much improved HTTP/2, cookies, more tests

This commit is contained in:
Jörg Prante 2017-05-11 23:52:00 +02:00
parent 4ca2ff395d
commit 8f1a7211e6
44 changed files with 2413 additions and 953 deletions

View file

@ -65,8 +65,6 @@ jar {
test {
jvmArgs "-javaagent:" + configurations.alpnagent.asPath
include 'org/xbib/netty/http/client/test/Http2FrameAdapterTest*'
include 'org/xbib/netty/http/client/test/InboundHttp2ToHttpAdapterTest*'
testLogging {
showStandardStreams = false
exceptionFormat = 'full'

Binary file not shown.

View file

@ -1,4 +1,4 @@
#Mon Apr 17 15:12:33 CEST 2017
#Tue May 02 21:00:09 CEST 2017
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME

172
gradlew vendored Executable file
View file

@ -0,0 +1,172 @@
#!/usr/bin/env sh
##############################################################################
##
## Gradle start up script for UN*X
##
##############################################################################
# Attempt to set APP_HOME
# Resolve links: $0 may be a link
PRG="$0"
# Need this for relative symlinks.
while [ -h "$PRG" ] ; do
ls=`ls -ld "$PRG"`
link=`expr "$ls" : '.*-> \(.*\)$'`
if expr "$link" : '/.*' > /dev/null; then
PRG="$link"
else
PRG=`dirname "$PRG"`"/$link"
fi
done
SAVED="`pwd`"
cd "`dirname \"$PRG\"`/" >/dev/null
APP_HOME="`pwd -P`"
cd "$SAVED" >/dev/null
APP_NAME="Gradle"
APP_BASE_NAME=`basename "$0"`
# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
DEFAULT_JVM_OPTS=""
# Use the maximum available, or set MAX_FD != -1 to use that value.
MAX_FD="maximum"
warn ( ) {
echo "$*"
}
die ( ) {
echo
echo "$*"
echo
exit 1
}
# OS specific support (must be 'true' or 'false').
cygwin=false
msys=false
darwin=false
nonstop=false
case "`uname`" in
CYGWIN* )
cygwin=true
;;
Darwin* )
darwin=true
;;
MINGW* )
msys=true
;;
NONSTOP* )
nonstop=true
;;
esac
CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar
# Determine the Java command to use to start the JVM.
if [ -n "$JAVA_HOME" ] ; then
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
# IBM's JDK on AIX uses strange locations for the executables
JAVACMD="$JAVA_HOME/jre/sh/java"
else
JAVACMD="$JAVA_HOME/bin/java"
fi
if [ ! -x "$JAVACMD" ] ; then
die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME
Please set the JAVA_HOME variable in your environment to match the
location of your Java installation."
fi
else
JAVACMD="java"
which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
Please set the JAVA_HOME variable in your environment to match the
location of your Java installation."
fi
# Increase the maximum file descriptors if we can.
if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then
MAX_FD_LIMIT=`ulimit -H -n`
if [ $? -eq 0 ] ; then
if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then
MAX_FD="$MAX_FD_LIMIT"
fi
ulimit -n $MAX_FD
if [ $? -ne 0 ] ; then
warn "Could not set maximum file descriptor limit: $MAX_FD"
fi
else
warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT"
fi
fi
# For Darwin, add options to specify how the application appears in the dock
if $darwin; then
GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\""
fi
# For Cygwin, switch paths to Windows format before running java
if $cygwin ; then
APP_HOME=`cygpath --path --mixed "$APP_HOME"`
CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`
JAVACMD=`cygpath --unix "$JAVACMD"`
# We build the pattern for arguments to be converted via cygpath
ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null`
SEP=""
for dir in $ROOTDIRSRAW ; do
ROOTDIRS="$ROOTDIRS$SEP$dir"
SEP="|"
done
OURCYGPATTERN="(^($ROOTDIRS))"
# Add a user-defined pattern to the cygpath arguments
if [ "$GRADLE_CYGPATTERN" != "" ] ; then
OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)"
fi
# Now convert the arguments - kludge to limit ourselves to /bin/sh
i=0
for arg in "$@" ; do
CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -`
CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option
if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition
eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"`
else
eval `echo args$i`="\"$arg\""
fi
i=$((i+1))
done
case $i in
(0) set -- ;;
(1) set -- "$args0" ;;
(2) set -- "$args0" "$args1" ;;
(3) set -- "$args0" "$args1" "$args2" ;;
(4) set -- "$args0" "$args1" "$args2" "$args3" ;;
(5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;;
(6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;;
(7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;;
(8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;;
(9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;;
esac
fi
# Escape application args
save ( ) {
for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done
echo " "
}
APP_ARGS=$(save "$@")
# Collect all arguments for the java command, following the shell quoting and substitution rules
eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS"
# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong
if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then
cd "$(dirname "$0")"
fi
exec "$JAVACMD" "$@"

84
gradlew.bat vendored Normal file
View file

@ -0,0 +1,84 @@
@if "%DEBUG%" == "" @echo off
@rem ##########################################################################
@rem
@rem Gradle startup script for Windows
@rem
@rem ##########################################################################
@rem Set local scope for the variables with windows NT shell
if "%OS%"=="Windows_NT" setlocal
set DIRNAME=%~dp0
if "%DIRNAME%" == "" set DIRNAME=.
set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS=
@rem Find java.exe
if defined JAVA_HOME goto findJavaFromJavaHome
set JAVA_EXE=java.exe
%JAVA_EXE% -version >NUL 2>&1
if "%ERRORLEVEL%" == "0" goto init
echo.
echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
echo.
echo Please set the JAVA_HOME variable in your environment to match the
echo location of your Java installation.
goto fail
:findJavaFromJavaHome
set JAVA_HOME=%JAVA_HOME:"=%
set JAVA_EXE=%JAVA_HOME%/bin/java.exe
if exist "%JAVA_EXE%" goto init
echo.
echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME%
echo.
echo Please set the JAVA_HOME variable in your environment to match the
echo location of your Java installation.
goto fail
:init
@rem Get command-line arguments, handling Windows variants
if not "%OS%" == "Windows_NT" goto win9xME_args
:win9xME_args
@rem Slurp the command line arguments.
set CMD_LINE_ARGS=
set _SKIP=2
:win9xME_args_slurp
if "x%~1" == "x" goto execute
set CMD_LINE_ARGS=%*
:execute
@rem Setup the command line
set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
@rem Execute Gradle
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS%
:end
@rem End local scope for the variables with windows NT shell
if "%ERRORLEVEL%"=="0" goto mainEnd
:fail
rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
rem the _cmd.exe /c_ return code!
if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1
exit /b 1
:mainEnd
if "%OS%"=="Windows_NT" endlocal
:omega

View file

@ -0,0 +1,412 @@
/*
* Copyright 2017 Jörg Prante
*
* Jörg Prante licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.xbib.netty.http.client;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.FullHttpMessage;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.Http2CodecUtil;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2ConnectionHandler;
import io.netty.handler.codec.http2.Http2Error;
import io.netty.handler.codec.http2.Http2EventAdapter;
import io.netty.handler.codec.http2.Http2Exception;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2LocalFlowController;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.codec.http2.Http2Stream;
import io.netty.handler.codec.http2.HttpConversionUtil;
import java.util.logging.Level;
import java.util.logging.Logger;
import static io.netty.handler.codec.http2.Http2Error.INTERNAL_ERROR;
import static io.netty.handler.codec.http2.Http2Error.PROTOCOL_ERROR;
import static io.netty.handler.codec.http2.Http2Exception.connectionError;
/**
* A HTTP/2 event adapter for a client.
* This event adapter expects {@link Http2Settings} are sent from the server before the
* {@link HttpRequest} is submitted by sending a header frame, and, if a body exists, a
* data frame.
* The push promises of a server response are acknowledged and the headers of a push promise
* are stored in the {@link HttpRequestContext} for being received later.
*/
public class Http2EventHandler extends Http2EventAdapter {
private static final Logger logger = Logger.getLogger(Http2EventHandler.class.getName());
private final Http2Connection connection;
private final Http2Connection.PropertyKey messageKey;
private final int maxContentLength;
private final boolean validateHttpHeaders;
/**
* Constructor for {@link Http2EventHandler}.
* @param connection the HTTP/2 connection
* @param maxContentLength the maximum content length
* @param validateHeaders true if headers should be validated
*/
Http2EventHandler(Http2Connection connection, int maxContentLength, boolean validateHeaders) {
this.connection = connection;
this.maxContentLength = maxContentLength;
this.validateHttpHeaders = validateHeaders;
this.messageKey = connection.newKey();
}
/**
* Handles an inbound {@code SETTINGS} frame.
* After frame is received, the reuqets is sent.
*
* @param ctx the context from the handler where the frame was read.
* @param settings the settings received from the remote endpoint.
*/
@Override
public void onSettingsRead(ChannelHandlerContext ctx, Http2Settings settings)
throws Http2Exception {
logger.log(Level.FINEST, () -> "settings received " + settings);
Channel channel = ctx.channel();
final HttpRequestContext httpRequestContext =
channel.attr(HttpClientChannelContext.REQUEST_CONTEXT_ATTRIBUTE_KEY).get();
final HttpRequest httpRequest = httpRequestContext.getHttpRequest();
ChannelPromise channelPromise = channel.newPromise();
Http2Headers headers = toHttp2Headers(httpRequestContext);
logger.log(Level.FINEST, () -> "write request " + httpRequest + " headers = " + headers);
boolean hasBody = httpRequestContext.getHttpRequest() instanceof FullHttpRequest;
Http2ConnectionHandler handler = ctx.pipeline().get(Http2ConnectionHandler.class);
Integer streamId = httpRequestContext.getStreamId().get();
ChannelFuture channelFuture = handler.encoder().writeHeaders(ctx, streamId,
headers, 0, !hasBody, channelPromise);
httpRequestContext.putStreamID(streamId, channelFuture, channelPromise);
if (hasBody) {
FullHttpRequest fullHttpRequest = (FullHttpRequest) httpRequestContext.getHttpRequest();
ChannelPromise contentChannelPromise = channel.newPromise();
streamId = httpRequestContext.getStreamId().get();
ChannelFuture contentChannelFuture = handler.encoder().writeData(ctx, streamId,
fullHttpRequest.content(), 0, true, contentChannelPromise);
httpRequestContext.putStreamID(streamId, contentChannelFuture, contentChannelPromise);
channel.flush();
}
httpRequestContext.getSettingsPromise().setSuccess();
}
/**
* Handles an inbound {@code HEADERS} frame.
* <p>
* Only one of the following methods will be called for each {@code HEADERS} frame sequence.
* One will be called when the {@code END_HEADERS} flag has been received.
* <ul>
* <li>{@link #onHeadersRead(ChannelHandlerContext, int, Http2Headers, int, boolean)}</li>
* <li>{@link #onHeadersRead(ChannelHandlerContext, int, Http2Headers, int, short, boolean, int, boolean)}</li>
* <li>{@link #onPushPromiseRead(ChannelHandlerContext, int, int, Http2Headers, int)}</li>
* </ul>
* <p>
* To say it another way; the {@link Http2Headers} will contain all of the headers
* for the current message exchange step (additional queuing is not necessary).
*
* @param ctx the context from the handler where the frame was read.
* @param streamId the subject stream for the frame.
* @param headers the received headers.
* @param padding additional bytes that should be added to obscure the true content size. Must be between 0 and
* 256 (inclusive).
* @param endOfStream Indicates whether this is the last frame to be sent from the remote endpoint
* for this stream.
*/
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int padding,
boolean endOfStream) throws Http2Exception {
logger.log(Level.FINEST, () -> "headers received " + headers);
Http2Stream stream = connection.stream(streamId);
FullHttpMessage msg = beginHeader(ctx, stream, headers, true, true);
if (msg != null) {
endHeader(ctx, stream, msg, endOfStream);
}
}
/**
* Handles an inbound {@code HEADERS} frame with priority information specified.
* Only called if {@code END_HEADERS} encountered.
* <p>
* Only one of the following methods will be called for each {@code HEADERS} frame sequence.
* One will be called when the {@code END_HEADERS} flag has been received.
* <ul>
* <li>{@link #onHeadersRead(ChannelHandlerContext, int, Http2Headers, int, boolean)}</li>
* <li>{@link #onHeadersRead(ChannelHandlerContext, int, Http2Headers, int, short, boolean, int, boolean)}</li>
* <li>{@link #onPushPromiseRead(ChannelHandlerContext, int, int, Http2Headers, int)}</li>
* </ul>
* <p>
* To say it another way; the {@link Http2Headers} will contain all of the headers
* for the current message exchange step (additional queuing is not necessary).
*
* @param ctx the context from the handler where the frame was read.
* @param streamId the subject stream for the frame.
* @param headers the received headers.
* @param streamDependency the stream on which this stream depends, or 0 if dependent on the
* connection.
* @param weight the new weight for the stream.
* @param exclusive whether or not the stream should be the exclusive dependent of its parent.
* @param padding additional bytes that should be added to obscure the true content size. Must be between 0 and
* 256 (inclusive).
* @param endOfStream Indicates whether this is the last frame to be sent from the remote endpoint
* for this stream.
*/
@Override
public void onHeadersRead(ChannelHandlerContext ctx, int streamId, Http2Headers headers, int streamDependency,
short weight, boolean exclusive, int padding, boolean endOfStream) throws Http2Exception {
logger.log(Level.FINEST, () -> "headers received " + headers);
Http2Stream stream = connection.stream(streamId);
FullHttpMessage msg = beginHeader(ctx, stream, headers, true, true);
if (msg != null) {
if (streamDependency != Http2CodecUtil.CONNECTION_STREAM_ID) {
msg.headers().setInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_DEPENDENCY_ID.text(),
streamDependency);
}
msg.headers().setShort(HttpConversionUtil.ExtensionHeaderNames.STREAM_WEIGHT.text(), weight);
endHeader(ctx, stream, msg, endOfStream);
}
}
/**
* Handles an inbound {@code DATA} frame.
*
* @param ctx the context from the handler where the frame was read.
* @param streamId the subject stream for the frame.
* @param data payload buffer for the frame. This buffer will be released by the codec.
* @param padding additional bytes that should be added to obscure the true content size. Must be between 0 and
* 256 (inclusive).
* @param endOfStream Indicates whether this is the last frame to be sent from the remote endpoint for this stream.
* @return the number of bytes that have been processed by the application. The returned bytes are used by the
* inbound flow controller to determine the appropriate time to expand the inbound flow control window (i.e. send
* {@code WINDOW_UPDATE}). Returning a value equal to the length of {@code data} + {@code padding} will effectively
* opt-out of application-level flow control for this frame. Returning a value less than the length of {@code data}
* + {@code padding} will defer the returning of the processed bytes, which the application must later return via
* {@link Http2LocalFlowController#consumeBytes(Http2Stream, int)}. The returned value must
* be >= {@code 0} and <= {@code data.readableBytes()} + {@code padding}.
*/
@Override
public int onDataRead(ChannelHandlerContext ctx, int streamId, ByteBuf data, int padding, boolean endOfStream)
throws Http2Exception {
logger.log(Level.FINEST, () -> "data received " + data);
Http2Stream stream = connection.stream(streamId);
FullHttpMessage msg = getMessage(stream);
if (msg == null) {
throw connectionError(PROTOCOL_ERROR, "data frame received for unknown stream id %d", streamId);
}
ByteBuf content = msg.content();
final int dataReadableBytes = data.readableBytes();
if (content.readableBytes() > maxContentLength - dataReadableBytes) {
throw connectionError(INTERNAL_ERROR,
"content length exceeded maximum of %d for stream id %d", maxContentLength, streamId);
}
content.writeBytes(data, data.readerIndex(), dataReadableBytes);
if (endOfStream) {
fireChannelRead(ctx, msg, false, stream);
}
return dataReadableBytes + padding;
}
/**
* Handles an inbound {@code RST_STREAM} frame. Deletes push stream id if present.
*
* @param ctx the context from the handler where the frame was read.
* @param streamId the stream that is terminating.
* @param errorCode the error code identifying the type of failure.
*/
@Override
public void onRstStreamRead(ChannelHandlerContext ctx, int streamId, long errorCode) throws Http2Exception {
logger.log(Level.FINEST, () -> "rst stream received: error code = " + errorCode);
Http2Stream stream = connection.stream(streamId);
FullHttpMessage msg = getMessage(stream);
if (msg != null) {
removeMessage(stream, true);
}
final HttpRequestContext httpRequestContext =
ctx.channel().attr(HttpClientChannelContext.REQUEST_CONTEXT_ATTRIBUTE_KEY).get();
httpRequestContext.getPushMap().remove(streamId);
}
/**
* Handles an inbound {@code PUSH_PROMISE} frame. Only called if {@code END_HEADERS} encountered.
* <p>
* Promised requests MUST be authoritative, cacheable, and safe.
* See <a href="https://tools.ietf.org/html/draft-ietf-httpbis-http2-17#section-8.2">[RFC http2], Section 8.2</a>.
* <p>
* Only one of the following methods will be called for each {@code HEADERS} frame sequence.
* One will be called when the {@code END_HEADERS} flag has been received.
* <ul>
* <li>{@link #onHeadersRead(ChannelHandlerContext, int, Http2Headers, int, boolean)}</li>
* <li>{@link #onHeadersRead(ChannelHandlerContext, int, Http2Headers, int, short, boolean, int, boolean)}</li>
* <li>{@link #onPushPromiseRead(ChannelHandlerContext, int, int, Http2Headers, int)}</li>
* </ul>
* <p>
* To say it another way; the {@link Http2Headers} will contain all of the headers
* for the current message exchange step (additional queuing is not necessary).
*
* @param ctx the context from the handler where the frame was read.
* @param streamId the stream the frame was sent on.
* @param promisedStreamId the ID of the promised stream.
* @param headers the received headers.
* @param padding additional bytes that should be added to obscure the true content size. Must be between 0 and
* 256 (inclusive).
*/
@Override
public void onPushPromiseRead(ChannelHandlerContext ctx, int streamId, int promisedStreamId,
Http2Headers headers, int padding) throws Http2Exception {
logger.log(Level.FINEST, () -> "push promise received: streamId " + streamId +
" promised stream ID = " + promisedStreamId + " headers =" + headers);
Http2Stream promisedStream = connection.stream(promisedStreamId);
FullHttpMessage msg = beginHeader(ctx, promisedStream, headers, false, false);
if (msg != null) {
msg.headers().setInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_PROMISE_ID.text(), streamId);
msg.headers().setShort(HttpConversionUtil.ExtensionHeaderNames.STREAM_WEIGHT.text(),
Http2CodecUtil.DEFAULT_PRIORITY_WEIGHT);
endHeader(ctx, promisedStream, msg, false);
}
Channel channel = ctx.channel();
final HttpRequestContext httpRequestContext =
channel.attr(HttpClientChannelContext.REQUEST_CONTEXT_ATTRIBUTE_KEY).get();
httpRequestContext.receiveStreamID(promisedStreamId, headers, channel.newPromise());
}
/**
* Notifies the listener that the given stream has now been removed from the connection and
* will no longer be returned via {@link Http2Connection#stream(int)}. The connection may
* maintain inactive streams for some time before removing them.
* <p>
* If a {@link RuntimeException} is thrown it will be logged and <strong>not propagated</strong>.
* Throwing from this method is not supported and is considered a programming error.
*/
@Override
public void onStreamRemoved(Http2Stream stream) {
logger.log(Level.FINEST, () -> "stream removed " + stream);
removeMessage(stream, true);
}
/**
* Create a new {@link FullHttpMessage} based upon the current connection parameters.
*
* @param stream The stream to create a message for
* @param headers The headers associated with {@code stream}
* @param validateHttpHeaders
* <ul>
* <li>{@code true} to validate HTTP headers in the http-codec</li>
* <li>{@code false} not to validate HTTP headers in the http-codec</li>
* </ul>
* @param alloc The {@link ByteBufAllocator} to use to generate the content of the message
* @throws Http2Exception if message can not be created
*/
private FullHttpMessage newMessage(Http2Stream stream, Http2Headers headers, boolean validateHttpHeaders,
ByteBufAllocator alloc) throws Http2Exception {
if (headers.status() != null) {
return HttpConversionUtil.toHttpResponse(stream.id(), headers, alloc, validateHttpHeaders);
} else {
return null;
}
}
/**
* Get the {@link FullHttpMessage} associated with {@code stream}.
* @param stream The stream to get the associated state from
* @return The {@link FullHttpMessage} associated with {@code stream}.
*/
private FullHttpMessage getMessage(Http2Stream stream) {
return (FullHttpMessage) stream.getProperty(messageKey);
}
/**
* Make {@code message} be the state associated with {@code stream}.
* @param stream The stream which {@code message} is associated with.
* @param message The message which contains the HTTP semantics.
*/
private void putMessage(Http2Stream stream, FullHttpMessage message) {
FullHttpMessage previous = stream.setProperty(messageKey, message);
if (previous != message && previous != null) {
previous.release();
}
}
/**
* The stream is out of scope for the HTTP message flow and will no longer be tracked
* @param stream The stream to remove associated state with
* @param release {@code true} to call release on the value if it is present. {@code false} to not call release.
*/
private void removeMessage(Http2Stream stream, boolean release) {
FullHttpMessage msg = stream.removeProperty(messageKey);
if (release && msg != null) {
msg.release();
}
}
/**
* Set final headers and fire a channel read event
*
* @param ctx The context to fire the event on
* @param msg The message to send
* @param release {@code true} to call release on the value if it is present. {@code false} to not call release.
* @param stream the stream of the message which is being fired
*/
private void fireChannelRead(ChannelHandlerContext ctx, FullHttpMessage msg, boolean release,
Http2Stream stream) {
removeMessage(stream, release);
HttpUtil.setContentLength(msg, msg.content().readableBytes());
ctx.fireChannelRead(msg);
}
private FullHttpMessage beginHeader(ChannelHandlerContext ctx, Http2Stream stream, Http2Headers headers,
boolean allowAppend, boolean appendToTrailer) throws Http2Exception {
FullHttpMessage msg = getMessage(stream);
if (msg == null) {
msg = newMessage(stream, headers, validateHttpHeaders, ctx.alloc());
} else {
if (allowAppend) {
HttpConversionUtil.addHttp2ToHttpHeaders(stream.id(), headers, msg, appendToTrailer);
} else {
throw new Http2Exception(Http2Error.PROTOCOL_ERROR, "stream already exists");
}
}
return msg;
}
private void endHeader(ChannelHandlerContext ctx, Http2Stream stream, FullHttpMessage msg,
boolean endOfStream) {
if (endOfStream) {
fireChannelRead(ctx, msg, getMessage(stream) != msg, stream);
} else {
putMessage(stream, msg);
}
}
private static Http2Headers toHttp2Headers(HttpRequestContext httpRequestContext) {
HttpRequest httpRequest = httpRequestContext.getHttpRequest();
Http2Headers headers = new DefaultHttp2Headers()
.method(httpRequest.method().asciiName())
.path(httpRequest.uri())
.scheme(httpRequestContext.getURI().getScheme())
.authority(httpRequestContext.getURI().getHost());
HttpConversionUtil.toHttp2Headers(httpRequest.headers(), headers);
return headers;
}
}

View file

@ -1,161 +0,0 @@
/*
* Copyright 2017 Jörg Prante
*
* Jörg Prante licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.xbib.netty.http.client;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.pool.ChannelPool;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http2.HttpConversionUtil;
import io.netty.util.internal.PlatformDependent;
import java.util.AbstractMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Netty channel handler for HTTP/2 responses.
*/
@ChannelHandler.Sharable
public class Http2Handler extends SimpleChannelInboundHandler<FullHttpResponse> {
private static final Logger logger = Logger.getLogger(Http2Handler.class.getName());
private final Map<Integer, Entry<ChannelFuture, ChannelPromise>> streamidPromiseMap;
private final HttpClient httpClient;
Http2Handler(HttpClient httpClient) {
this.streamidPromiseMap = PlatformDependent.newConcurrentHashMap();
this.httpClient = httpClient;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse httpResponse) throws Exception {
logger.log(Level.FINE, () -> httpResponse.getClass().getName());
Integer streamId = httpResponse.headers().getInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text());
if (streamId == null) {
logger.log(Level.WARNING, () -> "stream ID missing");
return;
}
final HttpRequestContext httpRequestContext =
ctx.channel().attr(HttpClientChannelContext.REQUEST_CONTEXT_ATTRIBUTE_KEY).get();
Entry<ChannelFuture, ChannelPromise> entry = streamidPromiseMap.get(streamId);
if (entry != null) {
HttpResponseListener httpResponseListener =
ctx.channel().attr(HttpClientChannelContext.RESPONSE_LISTENER_ATTRIBUTE_KEY).get();
if (httpResponseListener != null) {
httpResponseListener.onResponse(httpResponse);
}
entry.getValue().setSuccess();
if (httpClient.tryRedirect(ctx.channel(), httpResponse, httpRequestContext)) {
return;
}
logger.log(Level.FINE, () -> "success");
httpRequestContext.success("response arrived");
final ChannelPool channelPool =
ctx.channel().attr(HttpClientChannelContext.CHANNEL_POOL_ATTRIBUTE_KEY).get();
channelPool.release(ctx.channel());
} else {
logger.log(Level.WARNING, () -> "stream id not found in promises: " + streamId);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.log(Level.FINE, ctx::toString);
final ChannelPool channelPool =
ctx.channel().attr(HttpClientChannelContext.CHANNEL_POOL_ATTRIBUTE_KEY).get();
channelPool.release(ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ExceptionListener exceptionListener =
ctx.channel().attr(HttpClientChannelContext.EXCEPTION_LISTENER_ATTRIBUTE_KEY).get();
logger.log(Level.FINE, () -> "exception caught");
if (exceptionListener != null) {
exceptionListener.onException(cause);
}
final HttpRequestContext httpRequestContext =
ctx.channel().attr(HttpClientChannelContext.REQUEST_CONTEXT_ATTRIBUTE_KEY).get();
httpRequestContext.fail(cause.getMessage());
final ChannelPool channelPool =
ctx.channel().attr(HttpClientChannelContext.CHANNEL_POOL_ATTRIBUTE_KEY).get();
channelPool.release(ctx.channel());
}
void put(int streamId, ChannelFuture channelFuture, ChannelPromise promise) {
logger.log(Level.FINE, "put stream ID " + streamId);
streamidPromiseMap.put(streamId, new AbstractMap.SimpleEntry<>(channelFuture, promise));
}
void awaitResponses(HttpRequestContext httpRequestContext, ExceptionListener exceptionListener) {
int timeout = httpRequestContext.getTimeout();
Iterator<Entry<Integer, Entry<ChannelFuture, ChannelPromise>>> iterator = streamidPromiseMap.entrySet().iterator();
while (iterator.hasNext()) {
Entry<Integer, Entry<ChannelFuture, ChannelPromise>> entry = iterator.next();
ChannelFuture channelFuture = entry.getValue().getKey();
if (!channelFuture.awaitUninterruptibly(timeout, TimeUnit.MILLISECONDS)) {
IllegalStateException illegalStateException =
new IllegalStateException("time out while waiting to write for stream id " + entry.getKey());
if (exceptionListener != null) {
exceptionListener.onException(illegalStateException);
httpRequestContext.fail(illegalStateException.getMessage());
final ChannelPool channelPool =
channelFuture.channel().attr(HttpClientChannelContext.CHANNEL_POOL_ATTRIBUTE_KEY).get();
channelPool.release(channelFuture.channel());
}
throw illegalStateException;
}
if (!channelFuture.isSuccess()) {
throw new RuntimeException(channelFuture.cause());
}
ChannelPromise promise = entry.getValue().getValue();
if (!promise.awaitUninterruptibly(timeout, TimeUnit.MILLISECONDS)) {
IllegalStateException illegalStateException =
new IllegalStateException("time out while waiting for response on stream id " + entry.getKey());
if (exceptionListener != null) {
exceptionListener.onException(illegalStateException);
httpRequestContext.fail(illegalStateException.getMessage());
final ChannelPool channelPool =
channelFuture.channel().attr(HttpClientChannelContext.CHANNEL_POOL_ATTRIBUTE_KEY).get();
channelPool.release(channelFuture.channel());
}
throw illegalStateException;
}
if (!promise.isSuccess()) {
RuntimeException runtimeException = new RuntimeException(promise.cause());
if (exceptionListener != null) {
exceptionListener.onException(runtimeException);
httpRequestContext.fail(runtimeException.getMessage());
final ChannelPool channelPool =
channelFuture.channel().attr(HttpClientChannelContext.CHANNEL_POOL_ATTRIBUTE_KEY).get();
channelPool.release(channelFuture.channel());
}
throw runtimeException;
}
iterator.remove();
}
}
}

View file

@ -0,0 +1,146 @@
/*
* Copyright 2017 Jörg Prante
*
* Jörg Prante licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.xbib.netty.http.client;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.pool.ChannelPool;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.cookie.ClientCookieDecoder;
import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.HttpConversionUtil;
import org.xbib.netty.http.client.listener.CookieListener;
import org.xbib.netty.http.client.listener.ExceptionListener;
import org.xbib.netty.http.client.listener.HttpHeadersListener;
import org.xbib.netty.http.client.listener.HttpPushListener;
import org.xbib.netty.http.client.listener.HttpResponseListener;
import java.util.Map.Entry;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Netty channel handler for HTTP/2 responses.
*/
@ChannelHandler.Sharable
public class Http2ResponseHandler extends SimpleChannelInboundHandler<FullHttpResponse> {
private static final Logger logger = Logger.getLogger(Http2ResponseHandler.class.getName());
private final HttpClient httpClient;
Http2ResponseHandler(HttpClient httpClient) {
this.httpClient = httpClient;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse httpResponse) throws Exception {
logger.log(Level.FINE, () -> httpResponse.getClass().getName());
Integer streamId = httpResponse.headers().getInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text());
if (streamId == null) {
logger.log(Level.WARNING, () -> "stream ID missing in headers");
return;
}
final HttpRequestContext httpRequestContext =
ctx.channel().attr(HttpClientChannelContext.REQUEST_CONTEXT_ATTRIBUTE_KEY).get();
HttpHeaders httpHeaders = httpResponse.headers();
HttpHeadersListener httpHeadersListener =
ctx.channel().attr(HttpClientChannelContext.HEADER_LISTENER_ATTRIBUTE_KEY).get();
if (httpHeadersListener != null) {
logger.log(Level.FINE, () -> "firing onHeaders");
httpHeadersListener.onHeaders(httpHeaders);
}
CookieListener cookieListener =
ctx.channel().attr(HttpClientChannelContext.COOKIE_LISTENER_ATTRIBUTE_KEY).get();
for (String cookieString : httpHeaders.getAll(HttpHeaderNames.SET_COOKIE)) {
Cookie cookie = ClientCookieDecoder.STRICT.decode(cookieString);
httpRequestContext.addCookie(cookie);
if (cookieListener != null) {
logger.log(Level.FINE, () -> "firing onCookie");
cookieListener.onCookie(cookie);
}
}
Entry<Http2Headers, ChannelPromise> pushEntry = httpRequestContext.getPushMap().get(streamId);
if (pushEntry != null) {
final HttpPushListener httpPushListener =
ctx.channel().attr(HttpClientChannelContext.PUSH_LISTENER_ATTRIBUTE_KEY).get();
if (httpPushListener != null) {
httpPushListener.onPushReceived(pushEntry.getKey(), httpResponse);
}
if (!pushEntry.getValue().isSuccess()) {
pushEntry.getValue().setSuccess();
}
httpRequestContext.getPushMap().remove(streamId);
if (httpRequestContext.isFinished()) {
httpRequestContext.success("response finished");
}
return;
}
Entry<ChannelFuture, ChannelPromise> promiseEntry = httpRequestContext.getStreamIdPromiseMap().get(streamId);
if (promiseEntry != null) {
final HttpResponseListener httpResponseListener =
ctx.channel().attr(HttpClientChannelContext.RESPONSE_LISTENER_ATTRIBUTE_KEY).get();
if (httpResponseListener != null) {
httpResponseListener.onResponse(httpResponse);
}
if (!promiseEntry.getValue().isSuccess()) {
promiseEntry.getValue().setSuccess();
}
if (httpClient.tryRedirect(ctx.channel(), httpResponse, httpRequestContext)) {
return;
}
httpRequestContext.getStreamIdPromiseMap().remove(streamId);
if (httpRequestContext.isFinished()) {
httpRequestContext.success("response finished");
}
}
}
/**
* The only method to release a HTTP/2 channel back to the pool is to wait for inactivity.
* @param ctx the channel handler context
* @throws Exception if the channel could not be released back to the pool
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.log(Level.FINE, ctx::toString);
final ChannelPool channelPool =
ctx.channel().attr(HttpClientChannelContext.CHANNEL_POOL_ATTRIBUTE_KEY).get();
channelPool.release(ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.log(Level.FINE, () -> "exception caught: " + cause);
ExceptionListener exceptionListener =
ctx.channel().attr(HttpClientChannelContext.EXCEPTION_LISTENER_ATTRIBUTE_KEY).get();
if (exceptionListener != null) {
exceptionListener.onException(cause);
}
final HttpRequestContext httpRequestContext =
ctx.channel().attr(HttpClientChannelContext.REQUEST_CONTEXT_ATTRIBUTE_KEY).get();
httpRequestContext.fail(cause.getMessage());
final ChannelPool channelPool =
ctx.channel().attr(HttpClientChannelContext.CHANNEL_POOL_ATTRIBUTE_KEY).get();
channelPool.release(ctx.channel());
}
}

View file

@ -18,6 +18,7 @@ package org.xbib.netty.http.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
@ -32,15 +33,27 @@ import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.cookie.ClientCookieEncoder;
import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import org.xbib.netty.http.client.listener.CookieListener;
import org.xbib.netty.http.client.listener.ExceptionListener;
import org.xbib.netty.http.client.listener.HttpPushListener;
import org.xbib.netty.http.client.listener.HttpHeadersListener;
import org.xbib.netty.http.client.listener.HttpResponseListener;
import org.xbib.netty.http.client.util.InetAddressKey;
import org.xbib.netty.http.client.util.NetworkUtils;
import java.io.Closeable;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.URL;
import java.net.URI;
import java.net.URLDecoder;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -51,6 +64,8 @@ public final class HttpClient implements Closeable {
private static final Logger logger = Logger.getLogger(HttpClient.class.getName());
private static final AtomicInteger streamId = new AtomicInteger(3);
private final ByteBufAllocator byteBufAllocator;
private final EventLoopGroup eventLoopGroup;
@ -58,7 +73,7 @@ public final class HttpClient implements Closeable {
private final HttpClientChannelPoolMap poolMap;
/**
* Create a new HTTP client.
* Create a new HTTP client. Use {@link #builder()} to build HTTP client instance.
*/
HttpClient(ByteBufAllocator byteBufAllocator,
EventLoopGroup eventLoopGroup,
@ -68,6 +83,9 @@ public final class HttpClient implements Closeable {
this.byteBufAllocator = byteBufAllocator;
this.eventLoopGroup = eventLoopGroup;
this.poolMap = new HttpClientChannelPoolMap(this, httpClientChannelContext, bootstrap, maxConnections);
NetworkUtils.extendSystemProperties();
logger.log(Level.FINE, () -> "local host name = " + NetworkUtils.getLocalHostName("localhost"));
logger.log(Level.FINE, NetworkUtils::displayNetworkInterfaces);
}
/**
@ -80,7 +98,7 @@ public final class HttpClient implements Closeable {
}
public HttpClientRequestBuilder prepareRequest(HttpMethod method) {
return new HttpClientRequestBuilder(this, method, byteBufAllocator);
return new HttpClientRequestBuilder(this, method, byteBufAllocator, streamId.getAndAdd(2));
}
/**
@ -172,11 +190,18 @@ public final class HttpClient implements Closeable {
logger.log(Level.FINE, () -> "closed");
}
void dispatch(HttpRequestContext httpRequestContext, HttpResponseListener httpResponseListener,
ExceptionListener exceptionListener) {
final URL url = httpRequestContext.getURL();
void dispatch(final HttpRequestContext httpRequestContext) {
final URI uri = httpRequestContext.getURI();
final HttpRequest httpRequest = httpRequestContext.getHttpRequest();
logger.log(Level.FINE, () -> "trying URL " + url);
if (!httpRequestContext.getCookies().isEmpty()) {
logger.log(Level.FINE, () -> "configured cookies: " + httpRequestContext.getCookies());
Collection<Cookie> cookies = httpRequestContext.matchCookies();
if (!cookies.isEmpty()) {
logger.log(Level.FINE, () -> "updating cookie header with matched cookies: " + cookies);
httpRequest.headers().set(HttpHeaderNames.COOKIE, ClientCookieEncoder.STRICT.encode(cookies));
}
}
logger.log(Level.FINE, () -> "trying URL " + uri);
if (httpRequestContext.isExpired()) {
httpRequestContext.fail("request expired");
}
@ -185,27 +210,31 @@ public final class HttpClient implements Closeable {
return;
}
HttpVersion version = httpRequestContext.getHttpRequest().protocolVersion();
InetAddressKey inetAddressKey = new InetAddressKey(url, version);
// effectivly disable pool for HTTP/2
if (version.majorVersion() == 2) {
poolMap.remove(inetAddressKey);
}
boolean secure = "https".equals(uri.getScheme());
InetAddressKey inetAddressKey = new InetAddressKey(uri.getHost(), uri.getPort(), version, secure);
final FixedChannelPool pool = poolMap.get(inetAddressKey);
logger.log(Level.FINE, () -> "connecting to " + inetAddressKey);
Future<Channel> futureChannel = pool.acquire();
futureChannel.addListener((FutureListener<Channel>) future -> {
final ExceptionListener exceptionListener = httpRequestContext.getExceptionListener();
if (future.isSuccess()) {
Channel channel = future.getNow();
// set settings promise before adding httpRequestContext as a channel attribute
ChannelPromise settingsPromise = channel.newPromise();
httpRequestContext.setSettingsPromise(settingsPromise);
channel.attr(HttpClientChannelContext.CHANNEL_POOL_ATTRIBUTE_KEY).set(pool);
channel.attr(HttpClientChannelContext.REQUEST_CONTEXT_ATTRIBUTE_KEY).set(httpRequestContext);
if (httpResponseListener != null) {
HttpResponseListener httpResponseListener = httpRequestContext.getHttpResponseListener();
channel.attr(HttpClientChannelContext.RESPONSE_LISTENER_ATTRIBUTE_KEY).set(httpResponseListener);
}
if (exceptionListener != null) {
HttpPushListener httpPushListener = httpRequestContext.getHttpPushListener();
channel.attr(HttpClientChannelContext.PUSH_LISTENER_ATTRIBUTE_KEY).set(httpPushListener);
HttpHeadersListener httpHeadersListener = httpRequestContext.getHttpHeadersListener();
channel.attr(HttpClientChannelContext.HEADER_LISTENER_ATTRIBUTE_KEY).set(httpHeadersListener);
CookieListener cookieListener = httpRequestContext.getCookieListener();
channel.attr(HttpClientChannelContext.COOKIE_LISTENER_ATTRIBUTE_KEY).set(cookieListener);
channel.attr(HttpClientChannelContext.EXCEPTION_LISTENER_ATTRIBUTE_KEY).set(exceptionListener);
}
if (httpRequestContext.isFailed()) {
logger.log(Level.FINE, () -> "detected fail, close now");
logger.log(Level.FINE, () -> "detected fail, close channel");
future.cancel(true);
if (channel.isOpen()) {
channel.close();
@ -227,22 +256,60 @@ public final class HttpClient implements Closeable {
}
});
} else if (httpRequest.protocolVersion().majorVersion() == 2) {
HttpClientChannelInitializer.Http2SettingsHandler http2SettingsHandler =
poolMap.getHttpClientChannelInitializer().getHttp2SettingsHandler();
if (http2SettingsHandler != null) {
logger.log(Level.FINE, "HTTP2: waiting for settings");
http2SettingsHandler.awaitSettings(httpRequestContext, exceptionListener);
logger.log(Level.FINE, () -> "waiting for HTTP/2 settings");
settingsPromise.await(httpRequestContext.getTimeout(), TimeUnit.MILLISECONDS);
logger.log(Level.FINE, () -> "waiting for HTTP/2 responses = " + httpRequestContext.getStreamIdPromiseMap().size());
int timeout = httpRequestContext.getTimeout();
for (Map.Entry<Integer, Map.Entry<ChannelFuture, ChannelPromise>> entry :
httpRequestContext.getStreamIdPromiseMap().entrySet()) {
ChannelFuture channelFuture = entry.getValue().getKey();
if (channelFuture != null) {
logger.log(Level.FINE, "waiting for channel, stream ID = " + entry.getKey());
if (!channelFuture.awaitUninterruptibly(timeout, TimeUnit.MILLISECONDS)) {
IllegalStateException illegalStateException =
new IllegalStateException("time out while waiting to write for stream id " + entry.getKey());
if (exceptionListener != null) {
exceptionListener.onException(illegalStateException);
httpRequestContext.fail(illegalStateException.getMessage());
final ChannelPool channelPool =
channelFuture.channel().attr(HttpClientChannelContext.CHANNEL_POOL_ATTRIBUTE_KEY).get();
channelPool.release(channelFuture.channel());
}
throw illegalStateException;
}
if (!channelFuture.isSuccess()) {
throw new RuntimeException(channelFuture.cause());
}
}
ChannelPromise promise = entry.getValue().getValue();
logger.log(Level.FINE, "waiting for promise of stream ID = " + entry.getKey());
if (!promise.awaitUninterruptibly(timeout, TimeUnit.MILLISECONDS)) {
IllegalStateException illegalStateException =
new IllegalStateException("time out while waiting for response on stream id " + entry.getKey());
if (exceptionListener != null) {
exceptionListener.onException(illegalStateException);
httpRequestContext.fail(illegalStateException.getMessage());
if (channelFuture != null) {
final ChannelPool channelPool =
channelFuture.channel().attr(HttpClientChannelContext.CHANNEL_POOL_ATTRIBUTE_KEY).get();
channelPool.release(channelFuture.channel());
}
}
throw illegalStateException;
}
if (!promise.isSuccess()) {
RuntimeException runtimeException = new RuntimeException(promise.cause());
if (exceptionListener != null) {
exceptionListener.onException(runtimeException);
httpRequestContext.fail(runtimeException.getMessage());
if (channelFuture != null) {
final ChannelPool channelPool =
channelFuture.channel().attr(HttpClientChannelContext.CHANNEL_POOL_ATTRIBUTE_KEY).get();
channelPool.release(channelFuture.channel());
}
}
throw runtimeException;
}
Http2Handler http2Handler = poolMap.getHttpClientChannelInitializer().getHttp2Handler();
if (http2Handler != null) {
logger.log(Level.FINE, () ->
"HTTP2: trying to write, streamID=" + httpRequestContext.getStreamId() +
" request: " + httpRequest.toString());
ChannelPromise channelPromise = channel.newPromise();
http2Handler.put(httpRequestContext.getStreamId(), channel.write(httpRequest), channelPromise);
channel.flush();
logger.log(Level.FINE, "HTTP2: waiting for responses");
http2Handler.awaitResponses(httpRequestContext, exceptionListener);
}
}
} else {
@ -262,7 +329,7 @@ public final class HttpClient implements Closeable {
HttpMethod method = httpResponse.status().code() == 303 ? HttpMethod.GET :
httpRequestContext.getHttpRequest().method();
if (httpRequestContext.getRedirectCount().getAndIncrement() < httpRequestContext.getMaxRedirects()) {
dispatchRedirect(channel, method, new URL(redirUrl), httpRequestContext);
dispatchRedirect(method, URI.create(redirUrl), httpRequestContext);
} else {
httpRequestContext.fail("too many redirections");
final ChannelPool channelPool =
@ -295,7 +362,7 @@ public final class HttpClient implements Closeable {
return location;
} else {
logger.log(Level.FINE, "(relative->absolute) redirect to " + location);
return makeAbsolute(httpRequestContext.getURL(), location);
return makeAbsolute(httpRequestContext.getURI(), location);
}
default:
break;
@ -303,35 +370,31 @@ public final class HttpClient implements Closeable {
return null;
}
private void dispatchRedirect(Channel channel, HttpMethod method, URL url,
private void dispatchRedirect(HttpMethod method, URI uri,
HttpRequestContext httpRequestContext) {
final String uri = httpRequestContext.getHttpRequest().protocolVersion().majorVersion() == 2 ?
url.toExternalForm() : makeRelative(url);
final String uriStr = httpRequestContext.getHttpRequest().protocolVersion().majorVersion() == 2 ?
uri.toASCIIString() : makeRelative(uri);
final HttpRequest httpRequest;
if (method.equals(httpRequestContext.getHttpRequest().method()) &&
httpRequestContext.getHttpRequest() instanceof DefaultFullHttpRequest) {
DefaultFullHttpRequest defaultFullHttpRequest = (DefaultFullHttpRequest) httpRequestContext.getHttpRequest();
FullHttpRequest fullHttpRequest = defaultFullHttpRequest.copy();
fullHttpRequest.setUri(uri);
fullHttpRequest.setUri(uriStr);
httpRequest = fullHttpRequest;
} else {
httpRequest = new DefaultHttpRequest(httpRequestContext.getHttpRequest().protocolVersion(), method, uri);
httpRequest = new DefaultHttpRequest(httpRequestContext.getHttpRequest().protocolVersion(), method, uriStr);
}
for (Map.Entry<String, String> e : httpRequestContext.getHttpRequest().headers().entries()) {
httpRequest.headers().add(e.getKey(), e.getValue());
}
httpRequest.headers().set(HttpHeaderNames.HOST, url.getHost());
HttpRequestContext redirectContext = new HttpRequestContext(url, httpRequest,
httpRequest.headers().set(HttpHeaderNames.HOST, uri.getHost());
HttpRequestContext redirectContext = new HttpRequestContext(uri, httpRequest,
httpRequestContext);
logger.log(Level.FINE, "dispatchRedirect url = " + url + " with new request " + httpRequest.toString());
HttpResponseListener httpResponseListener =
channel.attr(HttpClientChannelContext.RESPONSE_LISTENER_ATTRIBUTE_KEY).get();
ExceptionListener exceptionListener =
channel.attr(HttpClientChannelContext.EXCEPTION_LISTENER_ATTRIBUTE_KEY).get();
dispatch(redirectContext, httpResponseListener, exceptionListener);
logger.log(Level.FINE, "dispatchRedirect url = " + uri + " with new request " + httpRequest.toString());
dispatch(redirectContext);
}
private String makeRelative(URL base) {
private String makeRelative(URI base) {
String uri = base.getPath();
if (base.getQuery() != null) {
uri = uri + "?" + base.getQuery();
@ -339,7 +402,7 @@ public final class HttpClient implements Closeable {
return uri;
}
private String makeAbsolute(URL base, String location) throws UnsupportedEncodingException {
private String makeAbsolute(URI base, String location) throws UnsupportedEncodingException {
String path = base.getPath() == null ? "/" : URLDecoder.decode(base.getPath(), "UTF-8");
if (location.startsWith("/")) {
path = location;
@ -348,7 +411,7 @@ public final class HttpClient implements Closeable {
} else {
path += "/" + location;
}
String scheme = base.getProtocol();
String scheme = base.getScheme();
StringBuilder sb = new StringBuilder(scheme).append("://").append(base.getHost());
int defaultPort = "http".equals(scheme) ? 80 : "https".equals(scheme) ? 443 : -1;
if (defaultPort != -1 && base.getPort() != -1 && defaultPort != base.getPort()) {

View file

@ -28,6 +28,7 @@ import io.netty.handler.proxy.Socks4ProxyHandler;
import io.netty.handler.proxy.Socks5ProxyHandler;
import io.netty.handler.ssl.CipherSuiteFilter;
import io.netty.handler.ssl.SslProvider;
import org.xbib.netty.http.client.util.ClientAuthMode;
import java.io.InputStream;
import java.net.InetSocketAddress;
@ -46,7 +47,7 @@ public class HttpClientBuilder implements HttpClientChannelContextDefaults {
private Bootstrap bootstrap;
// let Netty decide, where default is Runtime.getRuntime().availableProcessors() * 2
// let Netty decide about thread number, default is Runtime.getRuntime().availableProcessors() * 2
private int threads = 0;
private boolean tcpNodelay = DEFAULT_TCP_NODELAY;
@ -95,7 +96,7 @@ public class HttpClientBuilder implements HttpClientChannelContextDefaults {
private boolean useServerNameIdentification = DEFAULT_USE_SERVER_NAME_IDENTIFICATION;
private SslClientAuthMode sslClientAuthMode = DEFAULT_SSL_CLIENT_AUTH_MODE;
private ClientAuthMode clientAuthMode = DEFAULT_SSL_CLIENT_AUTH_MODE;
private HttpProxyHandler httpProxyHandler;
@ -252,8 +253,8 @@ public class HttpClientBuilder implements HttpClientChannelContextDefaults {
return this;
}
public HttpClientBuilder setSslClientAuthMode(SslClientAuthMode sslClientAuthMode) {
this.sslClientAuthMode = sslClientAuthMode;
public HttpClientBuilder setClientAuthMode(ClientAuthMode clientAuthMode) {
this.clientAuthMode = clientAuthMode;
return this;
}
@ -319,7 +320,7 @@ public class HttpClientBuilder implements HttpClientChannelContextDefaults {
readTimeoutMillis, enableGzip, installHttp2Upgrade,
sslProvider, ciphers, cipherSuiteFilter, trustManagerFactory,
keyCertChainInputStream, keyInputStream, keyPassword,
useServerNameIdentification, sslClientAuthMode,
useServerNameIdentification, clientAuthMode,
httpProxyHandler, socks4ProxyHandler, socks5ProxyHandler);
return new HttpClient(byteBufAllocator, eventLoopGroup, bootstrap, maxConnections, httpClientChannelContext);
}

View file

@ -22,6 +22,12 @@ import io.netty.handler.proxy.Socks5ProxyHandler;
import io.netty.handler.ssl.CipherSuiteFilter;
import io.netty.handler.ssl.SslProvider;
import io.netty.util.AttributeKey;
import org.xbib.netty.http.client.listener.CookieListener;
import org.xbib.netty.http.client.listener.ExceptionListener;
import org.xbib.netty.http.client.listener.HttpPushListener;
import org.xbib.netty.http.client.listener.HttpHeadersListener;
import org.xbib.netty.http.client.listener.HttpResponseListener;
import org.xbib.netty.http.client.util.ClientAuthMode;
import java.io.InputStream;
import javax.net.ssl.TrustManagerFactory;
@ -39,6 +45,15 @@ final class HttpClientChannelContext {
static final AttributeKey<HttpResponseListener> RESPONSE_LISTENER_ATTRIBUTE_KEY =
AttributeKey.valueOf("httpClientResponseListener");
static final AttributeKey<HttpHeadersListener> HEADER_LISTENER_ATTRIBUTE_KEY =
AttributeKey.valueOf("httpHeaderListener");
static final AttributeKey<CookieListener> COOKIE_LISTENER_ATTRIBUTE_KEY =
AttributeKey.valueOf("cookieListener");
static final AttributeKey<HttpPushListener> PUSH_LISTENER_ATTRIBUTE_KEY =
AttributeKey.valueOf("pushListener");
static final AttributeKey<ExceptionListener> EXCEPTION_LISTENER_ATTRIBUTE_KEY =
AttributeKey.valueOf("httpClientExceptionListener");
@ -74,7 +89,7 @@ final class HttpClientChannelContext {
private final boolean useServerNameIdentification;
private final SslClientAuthMode sslClientAuthMode;
private final ClientAuthMode clientAuthMode;
private final HttpProxyHandler httpProxyHandler;
@ -98,7 +113,7 @@ final class HttpClientChannelContext {
InputStream keyInputStream,
String keyPassword,
boolean useServerNameIdentification,
SslClientAuthMode sslClientAuthMode,
ClientAuthMode clientAuthMode,
HttpProxyHandler httpProxyHandler,
Socks4ProxyHandler socks4ProxyHandler,
Socks5ProxyHandler socks5ProxyHandler) {
@ -118,7 +133,7 @@ final class HttpClientChannelContext {
this.keyInputStream = keyInputStream;
this.keyPassword = keyPassword;
this.useServerNameIdentification = useServerNameIdentification;
this.sslClientAuthMode = sslClientAuthMode;
this.clientAuthMode = clientAuthMode;
this.httpProxyHandler = httpProxyHandler;
this.socks4ProxyHandler = socks4ProxyHandler;
this.socks5ProxyHandler = socks5ProxyHandler;
@ -188,8 +203,8 @@ final class HttpClientChannelContext {
return useServerNameIdentification;
}
SslClientAuthMode getSslClientAuthMode() {
return sslClientAuthMode;
ClientAuthMode getClientAuthMode() {
return clientAuthMode;
}
HttpProxyHandler getHttpProxyHandler() {

View file

@ -17,9 +17,12 @@ package org.xbib.netty.http.client;
import io.netty.handler.codec.http2.Http2SecurityUtil;
import io.netty.handler.ssl.CipherSuiteFilter;
import io.netty.handler.ssl.OpenSsl;
import io.netty.handler.ssl.SslProvider;
import io.netty.handler.ssl.SupportedCipherSuiteFilter;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import org.xbib.netty.http.client.util.InetAddressKey;
import org.xbib.netty.http.client.util.ClientAuthMode;
import javax.net.ssl.TrustManagerFactory;
@ -105,7 +108,7 @@ public interface HttpClientChannelContextDefaults {
/**
* Default SSL provider.
*/
SslProvider DEFAULT_SSL_PROVIDER = SslProvider.OPENSSL;
SslProvider DEFAULT_SSL_PROVIDER = OpenSsl.isAlpnSupported() ? SslProvider.OPENSSL : SslProvider.JDK;
Iterable<String> DEFAULT_CIPHERS = Http2SecurityUtil.CIPHERS;
@ -118,5 +121,5 @@ public interface HttpClientChannelContextDefaults {
/**
* Default for SSL client authentication.
*/
SslClientAuthMode DEFAULT_SSL_CLIENT_AUTH_MODE = SslClientAuthMode.NONE;
ClientAuthMode DEFAULT_SSL_CLIENT_AUTH_MODE = ClientAuthMode.NONE;
}

View file

@ -15,12 +15,12 @@
*/
package org.xbib.netty.http.client;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.ChannelInputShutdownReadComplete;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
@ -36,19 +36,19 @@ import io.netty.handler.codec.http2.Http2ClientUpgradeCodec;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2ConnectionPrefaceWrittenEvent;
import io.netty.handler.codec.http2.Http2FrameLogger;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandler;
import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandlerBuilder;
import io.netty.handler.codec.http2.InboundHttp2ToHttpAdapterBuilder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.ApplicationProtocolConfig;
import io.netty.handler.ssl.ApplicationProtocolNames;
import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler;
import io.netty.handler.ssl.SslCloseCompletionEvent;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import org.xbib.netty.http.client.listener.ExceptionListener;
import org.xbib.netty.http.client.util.InetAddressKey;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
@ -61,33 +61,41 @@ import javax.net.ssl.SSLException;
import javax.net.ssl.SSLParameters;
/**
* Netty HTTP client channel initializer.
*/
class HttpClientChannelInitializer extends ChannelInitializer<SocketChannel> {
private static final Logger logger = Logger.getLogger(HttpClientChannelInitializer.class.getName());
private static final Http2FrameLogger frameLogger =
new Http2FrameLogger(LogLevel.TRACE, HttpClientChannelInitializer.class);
private final HttpClientChannelContext context;
private final Http1Handler http1Handler;
private final HttpHandler httpHandler;
private final Http2Handler http2Handler;
private final Http2ResponseHandler http2ResponseHandler;
private InetAddressKey key;
private Http2SettingsHandler http2SettingsHandler;
private UserEventLogger userEventLogger;
HttpClientChannelInitializer(HttpClientChannelContext context, Http1Handler http1Handler,
Http2Handler http2Handler) {
/**
* Constructor for a new {@link HttpClientChannelInitializer}.
* @param context the HTTP client channel context
* @param httpHandler the HTTP 1.x handler
* @param http2ResponseHandler the HTTP 2 handler
*/
HttpClientChannelInitializer(HttpClientChannelContext context, HttpHandler httpHandler,
Http2ResponseHandler http2ResponseHandler) {
this.context = context;
this.http1Handler = http1Handler;
this.http2Handler = http2Handler;
this.httpHandler = httpHandler;
this.http2ResponseHandler = http2ResponseHandler;
}
/**
* Sets up a {@link InetAddressKey} for the channel initialization and initializes the channel.
* Using this method, the channel initializer can handle secure channels, the HTTP protocol version,
* and the host name for Server Name Identification (SNI).
* @param ch the channel
* @param key the key of the internet address
* @throws Exception if channel
*/
void initChannel(SocketChannel ch, InetAddressKey key) throws Exception {
this.key = key;
initChannel(ch);
@ -96,6 +104,9 @@ class HttpClientChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
logger.log(Level.FINE, () -> "initChannel with key = " + key);
if (key == null) {
throw new IllegalStateException("no key set for channel initialization");
}
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new TrafficLoggingHandler());
if (context.getHttpProxyHandler() != null) {
@ -108,22 +119,12 @@ class HttpClientChannelInitializer extends ChannelInitializer<SocketChannel> {
pipeline.addLast(context.getSocks5ProxyHandler());
}
pipeline.addLast(new ReadTimeoutHandler(context.getReadTimeoutMillis(), TimeUnit.MILLISECONDS));
http2SettingsHandler = new Http2SettingsHandler(ch.newPromise());
userEventLogger = new UserEventLogger();
if (context.getSslProvider() != null && key.isSecure()) {
configureEncrypted(ch);
} else {
configureClearText(ch);
}
logger.log(Level.FINE, () -> "initChannel pipeline handler names = " + ch.pipeline().names());
}
Http2SettingsHandler getHttp2SettingsHandler() {
return http2SettingsHandler;
}
Http2Handler getHttp2Handler() {
return http2Handler;
logger.log(Level.FINE, () -> "initChannel complete, pipeline handler names = " + ch.pipeline().names());
}
private void configureClearText(SocketChannel ch) {
@ -134,6 +135,7 @@ class HttpClientChannelInitializer extends ChannelInitializer<SocketChannel> {
configureHttp1Pipeline(pipeline);
} else if (key.getVersion().majorVersion() == 2) {
HttpToHttp2ConnectionHandler http2connectionHandler = createHttp2ConnectionHandler();
// using the upgrade handler means mixed HTTP 1 and HTTP 2 on the same connection
if (context.isInstallHttp2Upgrade()) {
HttpClientCodec http1connectionHandler = createHttp1ConnectionHandler();
Http2ClientUpgradeCodec upgradeCodec =
@ -154,30 +156,32 @@ class HttpClientChannelInitializer extends ChannelInitializer<SocketChannel> {
private void configureEncrypted(SocketChannel ch) throws SSLException {
ChannelPipeline pipeline = ch.pipeline();
if (key.getVersion().majorVersion() == 2) {
final SslContext http2SslContext = SslContextBuilder.forClient()
SslContextBuilder sslContextBuilder = SslContextBuilder.forClient()
.sslProvider(context.getSslProvider())
.keyManager(context.getKeyCertChainInputStream(), context.getKeyInputStream(), context.getKeyPassword())
.ciphers(context.getCiphers(), context.getCipherSuiteFilter())
.trustManager(context.getTrustManagerFactory())
.applicationProtocolConfig(new ApplicationProtocolConfig(
.trustManager(context.getTrustManagerFactory());
if (key.getVersion().majorVersion() == 2) {
sslContextBuilder.applicationProtocolConfig(new ApplicationProtocolConfig(
ApplicationProtocolConfig.Protocol.ALPN,
ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE,
ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT,
ApplicationProtocolNames.HTTP_2,
ApplicationProtocolNames.HTTP_1_1))
.build();
SslHandler sslHandler = http2SslContext.newHandler(ch.alloc());
try {
ApplicationProtocolNames.HTTP_1_1));
}
SslHandler sslHandler = sslContextBuilder.build().newHandler(ch.alloc());
SSLEngine engine = sslHandler.engine();
try {
if (context.isUseServerNameIdentification()) {
// execute DNS lookup and/or reverse lookup if IP for host name
String fullQualifiedHostname = key.getInetSocketAddress().getHostName();
SSLParameters params = engine.getSSLParameters();
params.setServerNames(Arrays.asList(new SNIServerName[]{new SNIHostName(fullQualifiedHostname)}));
engine.setSSLParameters(params);
}
switch (context.getSslClientAuthMode()) {
} finally {
pipeline.addLast(sslHandler);
}
switch (context.getClientAuthMode()) {
case NEED:
engine.setNeedClientAuth(true);
break;
@ -187,32 +191,12 @@ class HttpClientChannelInitializer extends ChannelInitializer<SocketChannel> {
default:
break;
}
} finally {
pipeline.addLast(sslHandler);
}
pipeline.addLast(new Http2NegotiationHandler(ApplicationProtocolNames.HTTP_1_1));
} else if (key.getVersion().majorVersion() == 1) {
final SslContext hhtp1SslContext = SslContextBuilder.forClient()
.sslProvider(context.getSslProvider())
.keyManager(context.getKeyCertChainInputStream(), context.getKeyInputStream(), context.getKeyPassword())
.ciphers(context.getCiphers(), context.getCipherSuiteFilter())
.trustManager(context.getTrustManagerFactory())
.build();
SslHandler sslHandler = hhtp1SslContext.newHandler(ch.alloc());
switch (context.getSslClientAuthMode()) {
case NEED:
sslHandler.engine().setNeedClientAuth(true);
break;
case WANT:
sslHandler.engine().setWantClientAuth(true);
break;
default:
break;
}
pipeline.addLast(sslHandler);
if (key.getVersion().majorVersion() == 1) {
HttpClientCodec http1connectionHandler = createHttp1ConnectionHandler();
pipeline.addLast(http1connectionHandler);
configureHttp1Pipeline(pipeline);
} else if (key.getVersion().majorVersion() == 2) {
pipeline.addLast(new Http2NegotiationHandler(ApplicationProtocolNames.HTTP_1_1));
}
}
@ -224,13 +208,12 @@ class HttpClientChannelInitializer extends ChannelInitializer<SocketChannel> {
new HttpObjectAggregator(context.getMaxContentLength(), false);
httpObjectAggregator.setMaxCumulationBufferComponents(context.getMaxCompositeBufferComponents());
pipeline.addLast(httpObjectAggregator);
pipeline.addLast(http1Handler);
pipeline.addLast(httpHandler);
}
private void configureHttp2Pipeline(ChannelPipeline pipeline) {
pipeline.addLast(http2SettingsHandler);
pipeline.addLast(userEventLogger);
pipeline.addLast(http2Handler);
pipeline.addLast(new UserEventLogger());
pipeline.addLast(http2ResponseHandler);
}
private HttpClientCodec createHttp1ConnectionHandler() {
@ -241,13 +224,9 @@ class HttpClientChannelInitializer extends ChannelInitializer<SocketChannel> {
final Http2Connection http2Connection = new DefaultHttp2Connection(false);
return new HttpToHttp2ConnectionHandlerBuilder()
.connection(http2Connection)
.frameLogger(frameLogger)
.frameLogger(new Http2FrameLogger(LogLevel.TRACE, HttpClientChannelInitializer.class))
.frameListener(new DelegatingDecompressorFrameListener(http2Connection,
new InboundHttp2ToHttpAdapterBuilder(http2Connection)
.maxContentLength(context.getMaxContentLength())
.propagateSettings(true)
.validateHttpHeaders(false)
.build()))
new Http2EventHandler(http2Connection, context.getMaxContentLength(), false)))
.build();
}
@ -263,81 +242,37 @@ class HttpClientChannelInitializer extends ChannelInitializer<SocketChannel> {
HttpToHttp2ConnectionHandler http2connectionHandler = createHttp2ConnectionHandler();
ctx.pipeline().addLast(http2connectionHandler);
configureHttp2Pipeline(ctx.pipeline());
logger.log(Level.FINE, "negotiated HTTP/2: handler = " + ctx.pipeline().names());
logger.log(Level.FINE, () -> "negotiated HTTP/2: handler = " + ctx.pipeline().names());
return;
}
if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) {
HttpClientCodec http1connectionHandler = createHttp1ConnectionHandler();
ctx.pipeline().addLast(http1connectionHandler);
configureHttp1Pipeline(ctx.pipeline());
logger.log(Level.FINE, "negotiated HTTP/1.1: handler = " + ctx.pipeline().names());
logger.log(Level.FINE, () -> "negotiated HTTP/1.1: handler = " + ctx.pipeline().names());
return;
}
// close and fail
ctx.close();
throw new IllegalStateException("unexpected protocol: " + protocol);
}
}
class Http2SettingsHandler extends SimpleChannelInboundHandler<Http2Settings> {
private final ChannelPromise promise;
Http2SettingsHandler(ChannelPromise promise) {
this.promise = promise;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Http2Settings msg) throws Exception {
promise.setSuccess();
ctx.pipeline().remove(this);
logger.log(Level.FINE, "settings handler removed, pipeline = " + ctx.pipeline().names());
}
/**
* Forward channel exceptions to the exception listener.
* @param ctx the channel handler context
* @param cause the cause of the exception
* @throws Exception if forwarding fails
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ExceptionListener exceptionListener =
ctx.channel().attr(HttpClientChannelContext.EXCEPTION_LISTENER_ATTRIBUTE_KEY).get();
logger.log(Level.FINE, () -> "exceptionCaught");
if (exceptionListener != null) {
exceptionListener.onException(cause);
}
final HttpRequestContext httpRequestContext =
ctx.channel().attr(HttpClientChannelContext.REQUEST_CONTEXT_ATTRIBUTE_KEY).get();
httpRequestContext.fail(cause.getMessage());
}
void awaitSettings(HttpRequestContext httpRequestContext, ExceptionListener exceptionListener) throws Exception {
int timeout = httpRequestContext.getTimeout();
if (!promise.awaitUninterruptibly(timeout, TimeUnit.MILLISECONDS)) {
IllegalStateException exception = new IllegalStateException("time out while waiting for HTTP/2 settings");
if (exceptionListener != null) {
exceptionListener.onException(exception);
httpRequestContext.fail(exception.getMessage());
}
throw exception;
}
if (!promise.isSuccess()) {
throw new RuntimeException(promise.cause());
}
}
}
@Sharable
private class UpgradeRequestHandler extends ChannelInboundHandlerAdapter {
/**
* Send an upgrade request if channel becomes active.
* @param ctx the channel handler context
* @throws Exception if upgrade request sending fails
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
DefaultFullHttpRequest upgradeRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/");
ctx.writeAndFlush(upgradeRequest);
super.channelActive(ctx);
ctx.pipeline().remove(this);
logger.log(Level.FINE, "upgrade request handler removed, pipeline = " + ctx.pipeline().names());
logger.log(Level.FINE, () -> "upgrade request handler removed, pipeline = " + ctx.pipeline().names());
}
/**
@ -348,9 +283,9 @@ class HttpClientChannelInitializer extends ChannelInitializer<SocketChannel> {
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.log(Level.FINE, () -> "exceptionCaught " + cause.getMessage());
ExceptionListener exceptionListener =
ctx.channel().attr(HttpClientChannelContext.EXCEPTION_LISTENER_ATTRIBUTE_KEY).get();
logger.log(Level.FINE, () -> "exceptionCaught");
if (exceptionListener != null) {
exceptionListener.onException(cause);
}
@ -360,6 +295,9 @@ class HttpClientChannelInitializer extends ChannelInitializer<SocketChannel> {
}
}
/**
* A Netty handler that logs user events and find expetced ones.
*/
@Sharable
private class UserEventLogger extends ChannelInboundHandlerAdapter {
@ -369,11 +307,46 @@ class HttpClientChannelInitializer extends ChannelInitializer<SocketChannel> {
if (evt instanceof Http2ConnectionPrefaceWrittenEvent ||
evt instanceof SslCloseCompletionEvent ||
evt instanceof ChannelInputShutdownReadComplete) {
// Expected events
// log expected events
logger.log(Level.FINE, () -> "user event is expected: " + evt);
return;
}
super.userEventTriggered(ctx, evt);
}
}
/**
* A Netty handler that logs the I/O traffic of a connection.
*/
@Sharable
private final class TrafficLoggingHandler extends LoggingHandler {
TrafficLoggingHandler() {
super("client", LogLevel.TRACE);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof ByteBuf && !((ByteBuf) msg).isReadable()) {
ctx.write(msg, promise);
} else {
super.write(ctx, msg, promise);
}
}
}
}

View file

@ -18,6 +18,7 @@ package org.xbib.netty.http.client;
import io.netty.channel.Channel;
import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.channel.socket.SocketChannel;
import org.xbib.netty.http.client.util.InetAddressKey;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
@ -45,7 +46,7 @@ public class HttpClientChannelPoolHandler implements ChannelPoolHandler {
@Override
public void channelCreated(Channel ch) throws Exception {
logger.log(Level.INFO, () -> "channel created " + ch + " key:" + key);
logger.log(Level.FINE, () -> "channel created " + ch + " key:" + key);
channelInitializer.initChannel((SocketChannel) ch, key);
int n = active.incrementAndGet();
if (n > peak) {
@ -55,12 +56,12 @@ public class HttpClientChannelPoolHandler implements ChannelPoolHandler {
@Override
public void channelAcquired(Channel ch) throws Exception {
logger.log(Level.INFO, () -> "channel acquired from pool " + ch);
logger.log(Level.FINE, () -> "channel acquired from pool " + ch);
}
@Override
public void channelReleased(Channel ch) throws Exception {
logger.log(Level.INFO, () -> "channel released to pool " + ch);
logger.log(Level.FINE, () -> "channel released to pool " + ch);
active.decrementAndGet();
}

View file

@ -18,6 +18,7 @@ package org.xbib.netty.http.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.pool.AbstractChannelPoolMap;
import io.netty.channel.pool.FixedChannelPool;
import org.xbib.netty.http.client.util.InetAddressKey;
/**
*
@ -49,7 +50,7 @@ public class HttpClientChannelPoolMap extends AbstractChannelPoolMap<InetAddress
@Override
protected FixedChannelPool newPool(InetAddressKey key) {
this.httpClientChannelInitializer = new HttpClientChannelInitializer(httpClientChannelContext,
new Http1Handler(httpClient), new Http2Handler(httpClient));
new HttpHandler(httpClient), new Http2ResponseHandler(httpClient));
this.httpClientChannelPoolHandler = new HttpClientChannelPoolHandler(httpClientChannelInitializer, key);
return new FixedChannelPool(bootstrap.remoteAddress(key.getInetSocketAddress()),
httpClientChannelPoolHandler, maxConnections);

View file

@ -11,41 +11,57 @@ import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.handler.codec.http.QueryStringEncoder;
import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.codec.http2.HttpConversionUtil;
import io.netty.util.AsciiString;
import io.netty.util.CharsetUtil;
import org.xbib.netty.http.client.listener.CookieListener;
import org.xbib.netty.http.client.listener.ExceptionListener;
import org.xbib.netty.http.client.listener.HttpPushListener;
import org.xbib.netty.http.client.listener.HttpHeadersListener;
import org.xbib.netty.http.client.listener.HttpResponseListener;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
*
*/
public class HttpClientRequestBuilder implements HttpRequestBuilder, HttpRequestDefaults {
private static final AtomicInteger streamId = new AtomicInteger(3);
private static final Logger logger = Logger.getLogger(HttpClientRequestBuilder.class.getName());
private final HttpClient httpClient;
private final ByteBufAllocator byteBufAllocator;
private final AtomicInteger streamId;
private final DefaultHttpHeaders headers;
private final List<String> removeHeaders;
private final Set<Cookie> cookies;
private final HttpMethod httpMethod;
private int timeout = DEFAULT_TIMEOUT_MILLIS;
@ -60,9 +76,11 @@ public class HttpClientRequestBuilder implements HttpRequestBuilder, HttpRequest
private int maxRedirects = DEFAULT_MAX_REDIRECT;
private URL url;
private URI uri;
private ByteBuf body;
private QueryStringEncoder queryStringEncoder;
private ByteBuf content;
private HttpRequest httpRequest;
@ -72,6 +90,12 @@ public class HttpClientRequestBuilder implements HttpRequestBuilder, HttpRequest
private ExceptionListener exceptionListener;
private HttpHeadersListener httpHeadersListener;
private CookieListener cookieListener;
private HttpPushListener httpPushListener;
/**
* Construct HTTP client request builder.
*
@ -79,12 +103,15 @@ public class HttpClientRequestBuilder implements HttpRequestBuilder, HttpRequest
* @param httpMethod HTTP method
* @param byteBufAllocator byte buf allocator
*/
HttpClientRequestBuilder(HttpClient httpClient, HttpMethod httpMethod, ByteBufAllocator byteBufAllocator) {
HttpClientRequestBuilder(HttpClient httpClient, HttpMethod httpMethod,
ByteBufAllocator byteBufAllocator, int streamId) {
this.httpClient = httpClient;
this.httpMethod = httpMethod;
this.byteBufAllocator = byteBufAllocator;
this.streamId = new AtomicInteger(streamId);
this.headers = new DefaultHttpHeaders();
this.removeHeaders = new ArrayList<>();
this.cookies = new HashSet<>();
}
@Override
@ -93,24 +120,19 @@ public class HttpClientRequestBuilder implements HttpRequestBuilder, HttpRequest
return this;
}
protected int getTimeout() {
return timeout;
}
@Override
public HttpRequestBuilder setURL(String url) {
try {
this.url = new URL(url);
} catch (MalformedURLException e) {
throw new UncheckedIOException(e);
this.uri = URI.create(url);
QueryStringDecoder queryStringDecoder = new QueryStringDecoder(uri, StandardCharsets.UTF_8);
this.queryStringEncoder = new QueryStringEncoder(queryStringDecoder.path());
for (Map.Entry<String, List<String>> entry : queryStringDecoder.parameters().entrySet()) {
for (String value : entry.getValue()) {
queryStringEncoder.addParam(entry.getKey(), value);
}
}
return this;
}
protected URL getURL() {
return url;
}
@Override
public HttpRequestBuilder addHeader(String name, Object value) {
headers.add(name, value);
@ -129,6 +151,20 @@ public class HttpClientRequestBuilder implements HttpRequestBuilder, HttpRequest
return this;
}
@Override
public HttpRequestBuilder addParam(String name, String value) {
if (queryStringEncoder != null) {
queryStringEncoder.addParam(name, value);
}
return this;
}
@Override
public HttpRequestBuilder addCookie(Cookie cookie) {
cookies.add(cookie);
return this;
}
@Override
public HttpRequestBuilder contentType(String contentType) {
addHeader(HttpHeaderNames.CONTENT_TYPE, contentType);
@ -141,10 +177,6 @@ public class HttpClientRequestBuilder implements HttpRequestBuilder, HttpRequest
return this;
}
protected HttpVersion getVersion() {
return httpVersion;
}
@Override
public HttpRequestBuilder acceptGzip(boolean gzip) {
this.gzip = gzip;
@ -157,20 +189,12 @@ public class HttpClientRequestBuilder implements HttpRequestBuilder, HttpRequest
return this;
}
protected boolean isFollowRedirect() {
return followRedirect;
}
@Override
public HttpRequestBuilder setMaxRedirects(int maxRedirects) {
this.maxRedirects = maxRedirects;
return this;
}
protected int getMaxRedirects() {
return maxRedirects;
}
@Override
public HttpRequestBuilder setUserAgent(String userAgent) {
this.userAgent = userAgent;
@ -179,60 +203,90 @@ public class HttpClientRequestBuilder implements HttpRequestBuilder, HttpRequest
@Override
public HttpRequestBuilder text(String text) throws IOException {
setBody(text, HttpHeaderValues.TEXT_PLAIN);
content(text, HttpHeaderValues.TEXT_PLAIN);
return this;
}
@Override
public HttpRequestBuilder json(String json) throws IOException {
setBody(json, HttpHeaderValues.APPLICATION_JSON);
content(json, HttpHeaderValues.APPLICATION_JSON);
return this;
}
@Override
public HttpRequestBuilder xml(String xml) throws IOException {
setBody(xml, "application/xml");
content(xml, "application/xml");
return this;
}
@Override
public HttpRequestBuilder setBody(CharSequence charSequence, String contentType) throws IOException {
setBody(charSequence.toString().getBytes(CharsetUtil.UTF_8), AsciiString.of(contentType));
public HttpRequestBuilder content(CharSequence charSequence, String contentType) throws IOException {
content(charSequence.toString().getBytes(CharsetUtil.UTF_8), AsciiString.of(contentType));
return this;
}
@Override
public HttpRequestBuilder setBody(byte[] buf, String contentType) throws IOException {
setBody(buf, AsciiString.of(contentType));
public HttpRequestBuilder content(byte[] buf, String contentType) throws IOException {
content(buf, AsciiString.of(contentType));
return this;
}
@Override
public HttpRequestBuilder setBody(ByteBuf body, String contentType) throws IOException {
setBody(body, AsciiString.of(contentType));
public HttpRequestBuilder content(ByteBuf body, String contentType) throws IOException {
content(body, AsciiString.of(contentType));
return this;
}
@Override
public HttpRequestBuilder onHeaders(HttpHeadersListener httpHeadersListener) {
this.httpHeadersListener = httpHeadersListener;
return this;
}
@Override
public HttpRequestBuilder onCookie(CookieListener cookieListener) {
this.cookieListener = cookieListener;
return this;
}
@Override
public HttpRequestBuilder onResponse(HttpResponseListener httpResponseListener) {
this.httpResponseListener = httpResponseListener;
return this;
}
@Override
public HttpRequestBuilder onException(ExceptionListener exceptionListener) {
this.exceptionListener = exceptionListener;
return this;
}
@Override
public HttpRequestBuilder onPushReceived(HttpPushListener httpPushListener) {
this.httpPushListener = httpPushListener;
return this;
}
@Override
public HttpRequest build() {
if (url == null) {
if (uri == null) {
throw new IllegalStateException("URL not set");
}
if (url.getHost() == null) {
throw new IllegalStateException("URL host not set: " + url);
if (uri.getHost() == null) {
throw new IllegalStateException("URL host not set: " + uri);
}
DefaultHttpRequest httpRequest = createHttpRequest();
String scheme = url.getProtocol();
StringBuilder sb = new StringBuilder(url.getHost());
String scheme = uri.getScheme();
StringBuilder sb = new StringBuilder(uri.getHost());
int defaultPort = "http".equals(scheme) ? 80 : "https".equals(scheme) ? 443 : -1;
if (defaultPort != -1 && url.getPort() != -1 && defaultPort != url.getPort()) {
sb.append(":").append(url.getPort());
if (defaultPort != -1 && uri.getPort() != -1 && defaultPort != uri.getPort()) {
sb.append(":").append(uri.getPort());
}
if (httpVersion.majorVersion() == 2) {
// this is a hack, because we only use the "origin-form" in request URIs
httpRequest.headers().set(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), scheme);
}
httpRequest.headers().add(HttpHeaderNames.HOST, sb.toString());
String host = sb.toString();
httpRequest.headers().add(HttpHeaderNames.HOST, host);
httpRequest.headers().add(HttpHeaderNames.DATE,
DateTimeFormatter.RFC_1123_DATE_TIME.format(ZonedDateTime.ofInstant(Instant.now(), ZoneId.of("GMT"))));
if (userAgent != null) {
@ -258,31 +312,57 @@ public class HttpClientRequestBuilder implements HttpRequestBuilder, HttpRequest
return httpRequest;
}
private DefaultHttpRequest createHttpRequest() {
// Regarding request-target URI:
// RFC https://tools.ietf.org/html/rfc7230#section-5.3.2
// would allow url.toExternalForm as absolute-form,
// but some servers do not support that. So, we create origin-form.
// But for HTTP/2, we should create the absolute-form, otherwise
// netty will throw "java.lang.IllegalArgumentException: :scheme must be specified."
String requestTarget = toOriginForm(url);
return body == null ?
new DefaultHttpRequest(httpVersion, httpMethod, requestTarget) :
new DefaultFullHttpRequest(httpVersion, httpMethod, requestTarget, body);
@Override
public HttpRequestContext execute() {
if (httpRequest == null) {
httpRequest = build();
}
if (httpResponseListener == null) {
httpResponseListener = httpRequestContext;
}
httpRequestContext = new HttpRequestContext(uri, httpRequest, streamId,
new AtomicBoolean(false),
new AtomicBoolean(false),
timeout, System.currentTimeMillis(),
followRedirect, maxRedirects, new AtomicInteger(0),
new CountDownLatch(1),
httpResponseListener,
exceptionListener,
httpHeadersListener,
cookieListener,
httpPushListener);
// copy cookie(s) to context, will be added later to headers in dispatch (because of auto-cookie setting while redirect)
if (!cookies.isEmpty()) {
for (Cookie cookie : cookies) {
httpRequestContext.addCookie(cookie);
}
}
httpClient.dispatch(httpRequestContext);
return httpRequestContext;
}
private String toOriginForm(URL base) {
@Override
public <T> CompletableFuture<T> execute(Function<FullHttpResponse, T> supplier) {
final CompletableFuture<T> completableFuture = new CompletableFuture<>();
onResponse(response -> completableFuture.complete(supplier.apply(response)));
onException(completableFuture::completeExceptionally);
execute();
return completableFuture;
}
private DefaultHttpRequest createHttpRequest() {
String requestTarget = toOriginForm();
logger.log(Level.FINE, () -> "origin form is " + requestTarget);
return content == null ?
new DefaultHttpRequest(httpVersion, httpMethod, requestTarget) :
new DefaultFullHttpRequest(httpVersion, httpMethod, requestTarget, content);
}
private String toOriginForm() {
StringBuilder sb = new StringBuilder();
String path = base.getPath() != null && !base.getPath().isEmpty() ? base.getPath() : "/";
String query = base.getQuery();
String ref = base.getRef();
if (path.charAt(0) != '/') {
sb.append('/');
}
sb.append(path);
if (query != null && !query.isEmpty()) {
sb.append('?').append(query);
}
String pathAndQuery = queryStringEncoder.toString();
sb.append(pathAndQuery.isEmpty() ? "/" : pathAndQuery);
String ref = uri.getFragment();
if (ref != null && !ref.isEmpty()) {
sb.append('#').append(ref);
}
@ -293,60 +373,18 @@ public class HttpClientRequestBuilder implements HttpRequestBuilder, HttpRequest
headers.add(name, value);
}
private void setBody(CharSequence charSequence, AsciiString contentType) throws IOException {
setBody(charSequence.toString().getBytes(CharsetUtil.UTF_8), contentType);
private void content(CharSequence charSequence, AsciiString contentType) throws IOException {
content(charSequence.toString().getBytes(CharsetUtil.UTF_8), contentType);
}
private void setBody(byte[] buf, AsciiString contentType) throws IOException {
private void content(byte[] buf, AsciiString contentType) throws IOException {
ByteBuf buffer = byteBufAllocator.buffer(buf.length).writeBytes(buf);
setBody(buffer, contentType);
content(buffer, contentType);
}
private void setBody(ByteBuf body, AsciiString contentType) throws IOException {
this.body = body;
private void content(ByteBuf body, AsciiString contentType) throws IOException {
this.content = body;
addHeader(HttpHeaderNames.CONTENT_LENGTH, (long) body.readableBytes());
addHeader(HttpHeaderNames.CONTENT_TYPE, contentType);
}
@Override
public HttpRequestBuilder onResponse(HttpResponseListener httpResponseListener) {
this.httpResponseListener = httpResponseListener;
return this;
}
@Override
public HttpRequestBuilder onError(ExceptionListener exceptionListener) {
this.exceptionListener = exceptionListener;
return this;
}
@Override
public HttpRequestContext execute() {
if (httpRequest == null) {
httpRequest = build();
}
if (httpRequestContext == null) {
httpRequestContext = new HttpRequestContext(getURL(),
httpRequest,
new AtomicBoolean(false),
new AtomicBoolean(false),
getTimeout(), System.currentTimeMillis(),
isFollowRedirect(), getMaxRedirects(), new AtomicInteger(0),
new CountDownLatch(1), streamId.get());
}
if (httpResponseListener == null) {
httpResponseListener = httpRequestContext;
}
httpClient.dispatch(httpRequestContext, httpResponseListener, exceptionListener);
return httpRequestContext;
}
@Override
public <T> CompletableFuture<T> execute(Function<FullHttpResponse, T> supplier) {
final CompletableFuture<T> completableFuture = new CompletableFuture<>();
onResponse(response -> completableFuture.complete(supplier.apply(response)));
onError(completableFuture::completeExceptionally);
execute();
return completableFuture;
}
}

View file

@ -20,21 +20,29 @@ import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.pool.ChannelPool;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.cookie.ClientCookieDecoder;
import io.netty.handler.codec.http.cookie.Cookie;
import org.xbib.netty.http.client.listener.CookieListener;
import org.xbib.netty.http.client.listener.ExceptionListener;
import org.xbib.netty.http.client.listener.HttpHeadersListener;
import org.xbib.netty.http.client.listener.HttpResponseListener;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Netty channel handler for HTTP 1.1.
* HTTP 1.x Netty channel handler.
*/
@ChannelHandler.Sharable
final class Http1Handler extends ChannelInboundHandlerAdapter {
final class HttpHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = Logger.getLogger(Http1Handler.class.getName());
private static final Logger logger = Logger.getLogger(HttpHandler.class.getName());
private final HttpClient httpClient;
Http1Handler(HttpClient httpClient) {
HttpHandler(HttpClient httpClient) {
this.httpClient = httpClient;
}
@ -52,16 +60,34 @@ final class Http1Handler extends ChannelInboundHandlerAdapter {
ctx.channel().attr(HttpClientChannelContext.REQUEST_CONTEXT_ATTRIBUTE_KEY).get();
if (msg instanceof FullHttpResponse) {
FullHttpResponse httpResponse = (FullHttpResponse) msg;
HttpHeaders httpHeaders = httpResponse.headers();
HttpHeadersListener httpHeadersListener =
ctx.channel().attr(HttpClientChannelContext.HEADER_LISTENER_ATTRIBUTE_KEY).get();
if (httpHeadersListener != null) {
logger.log(Level.FINE, () -> "firing onHeaders");
httpHeadersListener.onHeaders(httpHeaders);
}
CookieListener cookieListener =
ctx.channel().attr(HttpClientChannelContext.COOKIE_LISTENER_ATTRIBUTE_KEY).get();
for (String cookieString : httpHeaders.getAll(HttpHeaderNames.SET_COOKIE)) {
Cookie cookie = ClientCookieDecoder.STRICT.decode(cookieString);
httpRequestContext.addCookie(cookie);
if (cookieListener != null) {
logger.log(Level.FINE, () -> "firing onCookie");
cookieListener.onCookie(cookie);
}
}
HttpResponseListener httpResponseListener =
ctx.channel().attr(HttpClientChannelContext.RESPONSE_LISTENER_ATTRIBUTE_KEY).get();
if (httpResponseListener != null) {
logger.log(Level.FINE, () -> "firing onResponse");
httpResponseListener.onResponse(httpResponse);
}
logger.log(Level.FINE, () -> "trying redirect");
if (httpClient.tryRedirect(ctx.channel(), httpResponse, httpRequestContext)) {
return;
}
logger.log(Level.FINE, () -> "success");
httpRequestContext.success("response arrived");
httpRequestContext.success("response finished");
final ChannelPool channelPool =
ctx.channel().attr(HttpClientChannelContext.CHANNEL_POOL_ATTRIBUTE_KEY).get();
channelPool.release(ctx.channel());

View file

@ -18,6 +18,12 @@ package org.xbib.netty.http.client;
import io.netty.buffer.ByteBuf;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.cookie.Cookie;
import org.xbib.netty.http.client.listener.CookieListener;
import org.xbib.netty.http.client.listener.ExceptionListener;
import org.xbib.netty.http.client.listener.HttpPushListener;
import org.xbib.netty.http.client.listener.HttpHeadersListener;
import org.xbib.netty.http.client.listener.HttpResponseListener;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
@ -39,6 +45,10 @@ public interface HttpRequestBuilder {
HttpRequestBuilder removeHeader(String name);
HttpRequestBuilder addParam(String name, String value);
HttpRequestBuilder addCookie(Cookie cookie);
HttpRequestBuilder contentType(String contentType);
HttpRequestBuilder acceptGzip(boolean gzip);
@ -49,7 +59,7 @@ public interface HttpRequestBuilder {
HttpRequestBuilder setUserAgent(String userAgent);
HttpRequestBuilder setBody(CharSequence charSequence, String contentType) throws IOException;
HttpRequestBuilder content(CharSequence charSequence, String contentType) throws IOException;
HttpRequestBuilder text(String text) throws IOException;
@ -57,14 +67,20 @@ public interface HttpRequestBuilder {
HttpRequestBuilder xml(String xmlText) throws IOException;
HttpRequestBuilder setBody(byte[] buf, String contentType) throws IOException;
HttpRequestBuilder content(byte[] buf, String contentType) throws IOException;
HttpRequestBuilder setBody(ByteBuf body, String contentType) throws IOException;
HttpRequestBuilder content(ByteBuf body, String contentType) throws IOException;
HttpRequestBuilder onError(ExceptionListener exceptionListener);
HttpRequestBuilder onHeaders(HttpHeadersListener httpHeadersListener);
HttpRequestBuilder onCookie(CookieListener cookieListener);
HttpRequestBuilder onResponse(HttpResponseListener httpResponseListener);
HttpRequestBuilder onException(ExceptionListener exceptionListener);
HttpRequestBuilder onPushReceived(HttpPushListener httpPushListener);
HttpRequest build();
HttpRequestContext execute();

View file

@ -15,16 +15,32 @@
*/
package org.xbib.netty.http.client;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.util.internal.PlatformDependent;
import org.xbib.netty.http.client.listener.CookieListener;
import org.xbib.netty.http.client.listener.ExceptionListener;
import org.xbib.netty.http.client.listener.HttpPushListener;
import org.xbib.netty.http.client.listener.HttpHeadersListener;
import org.xbib.netty.http.client.listener.HttpResponseListener;
import org.xbib.netty.http.client.util.LimitedHashSet;
import java.net.URL;
import java.net.URI;
import java.util.AbstractMap;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
/**
*
@ -33,7 +49,7 @@ public final class HttpRequestContext implements HttpResponseListener, HttpReque
private static final Logger logger = Logger.getLogger(HttpRequestContext.class.getName());
private final URL url;
private final URI uri;
private final HttpRequest httpRequest;
@ -53,19 +69,43 @@ public final class HttpRequestContext implements HttpResponseListener, HttpReque
private final CountDownLatch latch;
private final Integer streamId;
private final AtomicInteger streamId;
private FullHttpResponse httpResponse;
private final HttpResponseListener httpResponseListener;
private final ExceptionListener exceptionListener;
private final HttpHeadersListener httpHeadersListener;
private final CookieListener cookieListener;
private final HttpPushListener httpPushListener;
private final Map<Integer, Map.Entry<ChannelFuture, ChannelPromise>> promiseMap;
private final Map<Integer, Map.Entry<Http2Headers, ChannelPromise>> pushMap;
private ChannelPromise settingsPromise;
private Collection<Cookie> cookies;
private Map<Integer, FullHttpResponse> httpResponses;
private Long stopTime;
HttpRequestContext(URL url, HttpRequest httpRequest,
HttpRequestContext(URI uri, HttpRequest httpRequest, AtomicInteger streamId,
AtomicBoolean succeeded, AtomicBoolean failed,
int timeout, Long startTime,
boolean followRedirect, int maxRedirects, AtomicInteger redirectCount,
CountDownLatch latch, Integer streamId) {
this.url = url;
CountDownLatch latch,
HttpResponseListener httpResponseListener,
ExceptionListener exceptionListener,
HttpHeadersListener httpHeadersListener,
CookieListener cookieListener,
HttpPushListener httpPushListener) {
this.uri = uri;
this.httpRequest = httpRequest;
this.streamId = streamId;
this.succeeded = succeeded;
this.failed = failed;
this.timeout = timeout;
@ -74,12 +114,24 @@ public final class HttpRequestContext implements HttpResponseListener, HttpReque
this.maxRedirects = maxRedirects;
this.redirectCount = redirectCount;
this.latch = latch;
this.streamId = streamId;
this.httpResponseListener = httpResponseListener;
this.exceptionListener = exceptionListener;
this.httpHeadersListener = httpHeadersListener;
this.cookieListener = cookieListener;
this.httpPushListener = httpPushListener;
this.promiseMap = PlatformDependent.newConcurrentHashMap();
this.pushMap = PlatformDependent.newConcurrentHashMap();
this.cookies = new LimitedHashSet<>(10);
}
HttpRequestContext(URL url, HttpRequest httpRequest, HttpRequestContext httpRequestContext) {
this.url = url;
/**
* A follow-up request to a given context with same stream ID (redirect).
*
*/
HttpRequestContext(URI uri, HttpRequest httpRequest, HttpRequestContext httpRequestContext) {
this.uri = uri;
this.httpRequest = httpRequest;
this.streamId = httpRequestContext.streamId;
this.succeeded = httpRequestContext.succeeded;
this.failed = httpRequestContext.failed;
this.failed.lazySet(false); // reset
@ -89,17 +141,105 @@ public final class HttpRequestContext implements HttpResponseListener, HttpReque
this.maxRedirects = httpRequestContext.maxRedirects;
this.redirectCount = httpRequestContext.redirectCount;
this.latch = httpRequestContext.latch;
this.streamId = httpRequestContext.streamId;
this.httpResponseListener = httpRequestContext.httpResponseListener;
this.exceptionListener = httpRequestContext.exceptionListener;
this.httpHeadersListener = httpRequestContext.httpHeadersListener;
this.cookieListener = httpRequestContext.cookieListener;
this.httpPushListener = httpRequestContext.httpPushListener;
this.promiseMap = httpRequestContext.promiseMap;
this.pushMap = httpRequestContext.pushMap;
this.cookies = httpRequestContext.cookies;
}
public URL getURL() {
return url;
public URI getURI() {
return uri;
}
public HttpRequest getHttpRequest() {
return httpRequest;
}
public HttpResponseListener getHttpResponseListener() {
return httpResponseListener;
}
public ExceptionListener getExceptionListener() {
return exceptionListener;
}
public HttpHeadersListener getHttpHeadersListener() {
return httpHeadersListener;
}
public CookieListener getCookieListener() {
return cookieListener;
}
public HttpPushListener getHttpPushListener() {
return httpPushListener;
}
public void setSettingsPromise(ChannelPromise settingsPromise) {
this.settingsPromise = settingsPromise;
}
public ChannelPromise getSettingsPromise() {
return settingsPromise;
}
public Map<Integer, Map.Entry<ChannelFuture, ChannelPromise>> getStreamIdPromiseMap() {
return promiseMap;
}
public void putStreamID(Integer streamId, ChannelFuture channelFuture, ChannelPromise channelPromise) {
logger.log(Level.FINE, () -> "put stream ID " + streamId + " future = " + channelFuture);
promiseMap.put(streamId, new AbstractMap.SimpleEntry<>(channelFuture, channelPromise));
}
public Map<Integer, Map.Entry<Http2Headers, ChannelPromise>> getPushMap() {
return pushMap;
}
public void receiveStreamID(Integer streamId, Http2Headers headers, ChannelPromise channelPromise) {
logger.log(Level.FINE, () -> "receive stream ID " + streamId + " " + headers);
pushMap.put(streamId, new AbstractMap.SimpleEntry<>(headers, channelPromise));
}
public boolean isFinished() {
return promiseMap.isEmpty() && pushMap.isEmpty();
}
public void addCookie(Cookie cookie) {
cookies.add(cookie);
}
public Collection<Cookie> getCookies() {
return cookies;
}
public List<Cookie> matchCookies() {
return cookies.stream()
.filter(this::matchCookie)
.collect(Collectors.toList());
}
private boolean matchCookie(Cookie cookie) {
boolean domainMatch = cookie.domain() == null || uri.getHost().endsWith(cookie.domain());
if (!domainMatch) {
return false;
}
boolean pathMatch = "/".equals(cookie.path()) || uri.getPath().startsWith(cookie.path());
if (!pathMatch) {
return false;
}
boolean secure = "https".equals(uri.getScheme());
boolean secureMatch = (secure && cookie.isSecure()) || (!secure && !cookie.isSecure());
if (!secureMatch) {
return false;
}
return true;
}
public int getTimeout() {
return timeout;
}
@ -144,7 +284,7 @@ public final class HttpRequestContext implements HttpResponseListener, HttpReque
return latch;
}
public Integer getStreamId() {
public AtomicInteger getStreamId() {
return streamId;
}
@ -162,12 +302,15 @@ public final class HttpRequestContext implements HttpResponseListener, HttpReque
logger.log(Level.FINE, () -> "success because of " + reason);
if (succeeded.compareAndSet(false, true)) {
latch.countDown();
}
}
public void fail(String reason) {
logger.log(Level.FINE, () -> "failed because of " + reason);
IllegalStateException exception = new IllegalStateException(reason);
if (exceptionListener != null) {
exceptionListener.onException(exception);
}
if (failed.compareAndSet(false, true)) {
latch.countDown();
}
@ -175,11 +318,10 @@ public final class HttpRequestContext implements HttpResponseListener, HttpReque
@Override
public void onResponse(FullHttpResponse fullHttpResponse) {
this.httpResponse = fullHttpResponse;
this.httpResponses.put(streamId.get(), fullHttpResponse);
}
public FullHttpResponse getHttpResponse() {
return httpResponse;
public Map<Integer, FullHttpResponse> getHttpResponses() {
return httpResponses;
}
}

View file

@ -1,56 +0,0 @@
/*
* Copyright 2017 Jörg Prante
*
* Jörg Prante licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.xbib.netty.http.client;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
/**
* A Netty handler that logs the I/O traffic of a connection.
*/
public final class TrafficLoggingHandler extends LoggingHandler {
public TrafficLoggingHandler() {
super("client", LogLevel.TRACE);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof ByteBuf && !((ByteBuf) msg).isReadable()) {
ctx.write(msg, promise);
} else {
super.write(ctx, msg, promise);
}
}
}

View file

@ -0,0 +1,11 @@
package org.xbib.netty.http.client.listener;
import io.netty.handler.codec.http.cookie.Cookie;
/**
*/
@FunctionalInterface
public interface CookieListener {
void onCookie(Cookie cookie);
}

View file

@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.xbib.netty.http.client;
package org.xbib.netty.http.client.listener;
/**
*/

View file

@ -0,0 +1,26 @@
/*
* Copyright 2017 Jörg Prante
*
* Jörg Prante licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.xbib.netty.http.client.listener;
import io.netty.handler.codec.http.HttpHeaders;
/**
*/
@FunctionalInterface
public interface HttpHeadersListener {
void onHeaders(HttpHeaders httpHeaders);
}

View file

@ -0,0 +1,29 @@
/*
* Copyright 2017 Jörg Prante
*
* Jörg Prante licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.xbib.netty.http.client.listener;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http2.Http2Headers;
/**
* This listener can forward HTTP push
*
*/
@FunctionalInterface
public interface HttpPushListener {
void onPushReceived(Http2Headers headers, FullHttpResponse fullHttpResponse);
}

View file

@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.xbib.netty.http.client;
package org.xbib.netty.http.client.listener;
import io.netty.handler.codec.http.FullHttpResponse;

View file

@ -0,0 +1,4 @@
/**
* Listeners for Netty HTTP client.
*/
package org.xbib.netty.http.client.listener;

View file

@ -13,11 +13,11 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.xbib.netty.http.client;
package org.xbib.netty.http.client.util;
/**
*
* Client authentication modes, useful for SSL channels.
*/
public enum SslClientAuthMode {
public enum ClientAuthMode {
NONE, WANT, NEED
}

View file

@ -13,66 +13,64 @@
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.xbib.netty.http.client;
package org.xbib.netty.http.client.util;
import io.netty.handler.codec.http.HttpVersion;
import java.net.InetSocketAddress;
import java.net.URL;
/**
* A key for host, port, HTTP version, and secure transport mode of a channel for HTTP.
*/
public class InetAddressKey {
private final InetSocketAddress inetSocketAddress;
private final String host;
private final int port;
private final HttpVersion version;
private final Boolean secure;
InetAddressKey(URL url, HttpVersion version) {
this.version = version;
String protocol = url.getProtocol();
this.secure = "https".equals(protocol);
int port = url.getPort();
if (port == -1) {
port = "http".equals(protocol) ? 80 : (secure ? 443 : -1);
}
this.inetSocketAddress = new InetSocketAddress(url.getHost(), port);
}
private InetSocketAddress inetSocketAddress;
InetAddressKey(InetSocketAddress inetSocketAddress, HttpVersion version, boolean secure) {
this.inetSocketAddress = inetSocketAddress;
public InetAddressKey(String host, int port, HttpVersion version, boolean secure) {
this.host = host;
this.port = port == -1 ? secure ? 443 : 80 : port;
this.version = version;
this.secure = secure;
}
InetSocketAddress getInetSocketAddress() {
public InetSocketAddress getInetSocketAddress() {
if (inetSocketAddress == null) {
this.inetSocketAddress = new InetSocketAddress(host, port);
}
return inetSocketAddress;
}
HttpVersion getVersion() {
public HttpVersion getVersion() {
return version;
}
boolean isSecure() {
public boolean isSecure() {
return secure;
}
public String toString() {
return inetSocketAddress + " (version:" + version + ",secure:" + secure + ")";
return host + ":" + port + " (version:" + version + ",secure:" + secure + ")";
}
@Override
public boolean equals(Object object) {
return object instanceof InetAddressKey &&
inetSocketAddress.equals(((InetAddressKey) object).inetSocketAddress) &&
host.equals(((InetAddressKey) object).host) &&
port == ((InetAddressKey) object).port &&
version.equals(((InetAddressKey) object).version) &&
secure == ((InetAddressKey) object).secure;
}
@Override
public int hashCode() {
return inetSocketAddress.hashCode() ^ version.hashCode() ^ secure.hashCode();
return host.hashCode() ^ port ^ version.hashCode() ^ secure.hashCode();
}
}

View file

@ -0,0 +1,53 @@
/*
* Copyright 2017 Jörg Prante
*
* Jörg Prante licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.xbib.netty.http.client.util;
import java.util.Collection;
import java.util.LinkedHashSet;
/**
* A {@link java.util.Set} with limited size. If the size is exceeded, an exception is thrown.
*/
public final class LimitedHashSet<E> extends LinkedHashSet<E> {
private static final long serialVersionUID = 1838128758142912702L;
private final int max;
public LimitedHashSet(int max) {
this.max = max;
}
@Override
public boolean add(E element) {
if (max < size()) {
throw new IllegalStateException("limit exceeded");
}
return super.add(element);
}
@Override
public boolean addAll(Collection<? extends E> elements) {
boolean b = false;
for (E element : elements) {
if (max < size()) {
throw new IllegalStateException("limit exceeded");
}
b = b || super.add(element);
}
return b;
}
}

View file

@ -0,0 +1,24 @@
/*
* Copyright 2017 Jörg Prante
*
* Jörg Prante licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.xbib.netty.http.client.util;
/**
* The network classes.
*/
public enum NetworkClass {
ANY, LOOPBACK, LOCAL, PUBLIC
}

View file

@ -0,0 +1,24 @@
/*
* Copyright 2017 Jörg Prante
*
* Jörg Prante licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.xbib.netty.http.client.util;
/**
* The TCP/IP network protocol versions.
*/
public enum NetworkProtocolVersion {
IPV4, IPV6, IPV46, NONE
}

View file

@ -0,0 +1,596 @@
/*
* Copyright 2017 Jörg Prante
*
* Jörg Prante licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.xbib.netty.http.client.util;
import java.io.IOException;
import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.InterfaceAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.function.Predicate;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Helper class for Java networking.
*/
public class NetworkUtils {
private static final Logger logger = Logger.getLogger(NetworkUtils.class.getName());
private static final String lf = System.lineSeparator();
private static final char[] hexDigit = new char[]{
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'
};
private static final String IPV4_SETTING = "java.net.preferIPv4Stack";
private static final String IPV6_SETTING = "java.net.preferIPv6Addresses";
private static InetAddress localAddress;
public static void extendSystemProperties() {
InetAddress address;
try {
address = InetAddress.getLocalHost();
} catch (Exception e) {
logger.log(Level.WARNING, e.getMessage(), e);
address = InetAddress.getLoopbackAddress();
}
localAddress = address;
try {
Map<String, String> map = new HashMap<>();
map.put("net.localhost", address.getCanonicalHostName());
String hostname = address.getHostName();
map.put("net.hostname", hostname);
InetAddress[] hostnameAddresses = InetAddress.getAllByName(hostname);
int i = 0;
for (InetAddress hostnameAddress : hostnameAddresses) {
map.put("net.hostaddress." + (i++), hostnameAddress.getCanonicalHostName());
}
for (NetworkInterface networkInterface : getAllRunningAndUpInterfaces()) {
InetAddress inetAddress = getFirstNonLoopbackAddress(networkInterface, NetworkProtocolVersion.IPV4);
if (inetAddress != null) {
map.put("net." + networkInterface.getDisplayName(), inetAddress.getCanonicalHostName());
}
}
logger.log(Level.FINE, "found network properties for system properties: " + map);
for (Map.Entry<String, String> entry : map.entrySet()) {
System.setProperty(entry.getKey(), entry.getValue());
}
} catch (Throwable e) {
logger.log(Level.WARNING, e.getMessage(), e);
}
}
private NetworkUtils() {
}
public static boolean isPreferIPv4() {
return Boolean.getBoolean(System.getProperty(IPV4_SETTING));
}
public static boolean isPreferIPv6() {
return Boolean.getBoolean(System.getProperty(IPV6_SETTING));
}
public static InetAddress getIPv4Localhost() throws UnknownHostException {
return getLocalhost(NetworkProtocolVersion.IPV4);
}
public static InetAddress getIPv6Localhost() throws UnknownHostException {
return getLocalhost(NetworkProtocolVersion.IPV6);
}
public static InetAddress getLocalhost(NetworkProtocolVersion ipversion) throws UnknownHostException {
return ipversion == NetworkProtocolVersion.IPV4 ?
InetAddress.getByName("127.0.0.1") : InetAddress.getByName("::1");
}
public static String getLocalHostName(String defaultHostName) {
if (localAddress == null) {
return defaultHostName;
}
String hostName = localAddress.getHostName();
if (hostName == null) {
return defaultHostName;
}
return hostName;
}
public static String getLocalHostAddress(String defaultHostAddress) {
if (localAddress == null) {
return defaultHostAddress;
}
String hostAddress = localAddress.getHostAddress();
if (hostAddress == null) {
return defaultHostAddress;
}
return hostAddress;
}
public static InetAddress getLocalAddress() {
return localAddress;
}
public static NetworkClass getNetworkClass(InetAddress address) {
if (address == null || address.isAnyLocalAddress()) {
return NetworkClass.ANY;
}
if (address.isLoopbackAddress()) {
return NetworkClass.LOOPBACK;
}
if (address.isLinkLocalAddress() || address.isSiteLocalAddress()) {
return NetworkClass.LOCAL;
}
return NetworkClass.PUBLIC;
}
public static String format(InetAddress address) {
return format(address, -1);
}
public static String format(InetSocketAddress address) {
return format(address.getAddress(), address.getPort());
}
public static String format(InetAddress address, int port) {
Objects.requireNonNull(address);
StringBuilder sb = new StringBuilder();
if (port != -1 && address instanceof Inet6Address) {
sb.append(toUriString(address));
} else {
sb.append(toAddrString(address));
}
if (port != -1) {
sb.append(':').append(port);
}
return sb.toString();
}
public static String toUriString(InetAddress ip) {
if (ip instanceof Inet6Address) {
return "[" + toAddrString(ip) + "]";
}
return toAddrString(ip);
}
public static String toAddrString(InetAddress ip) {
if (ip == null) {
throw new NullPointerException("ip");
}
if (ip instanceof Inet4Address) {
byte[] bytes = ip.getAddress();
return (bytes[0] & 0xff) + "." + (bytes[1] & 0xff) + "." + (bytes[2] & 0xff) + "." + (bytes[3] & 0xff);
}
if (!(ip instanceof Inet6Address)) {
throw new IllegalArgumentException("ip");
}
byte[] bytes = ip.getAddress();
int[] hextets = new int[8];
for (int i = 0; i < hextets.length; i++) {
hextets[i] = (bytes[2 * i] & 255) << 8 | bytes[2 * i + 1] & 255;
}
compressLongestRunOfZeroes(hextets);
return hextetsToIPv6String(hextets);
}
public static boolean matchesNetwork(NetworkClass given, NetworkClass expected) {
switch (expected) {
case ANY:
return EnumSet.of(NetworkClass.LOOPBACK, NetworkClass.LOCAL, NetworkClass.PUBLIC, NetworkClass.ANY).contains(given);
case PUBLIC:
return EnumSet.of(NetworkClass.LOOPBACK, NetworkClass.LOCAL, NetworkClass.PUBLIC).contains(given);
case LOCAL:
return EnumSet.of(NetworkClass.LOOPBACK, NetworkClass.LOCAL).contains(given);
case LOOPBACK:
return NetworkClass.LOOPBACK == given;
}
return false;
}
public static InetAddress getFirstNonLoopbackAddress(NetworkProtocolVersion ipversion) {
InetAddress address;
for (NetworkInterface networkInterface : getAllNetworkInterfaces()) {
try {
if (!networkInterface.isUp() || networkInterface.isLoopback()) {
continue;
}
} catch (Exception e) {
logger.log(Level.WARNING, e.getMessage(), e);
continue;
}
address = getFirstNonLoopbackAddress(networkInterface, ipversion);
if (address != null) {
return address;
}
}
return null;
}
public static InetAddress getFirstNonLoopbackAddress(NetworkInterface networkInterface, NetworkProtocolVersion ipVersion) {
if (networkInterface == null) {
throw new IllegalArgumentException("network interface is null");
}
for (Enumeration<InetAddress> addresses = networkInterface.getInetAddresses(); addresses.hasMoreElements(); ) {
InetAddress address = addresses.nextElement();
if (!address.isLoopbackAddress() && (address instanceof Inet4Address && ipVersion == NetworkProtocolVersion.IPV4) ||
(address instanceof Inet6Address && ipVersion == NetworkProtocolVersion.IPV6)) {
return address;
}
}
return null;
}
public static InetAddress getFirstAddress(NetworkInterface networkInterface, NetworkProtocolVersion ipVersion) {
if (networkInterface == null) {
throw new IllegalArgumentException("network interface is null");
}
for (Enumeration<InetAddress> addresses = networkInterface.getInetAddresses(); addresses.hasMoreElements(); ) {
InetAddress address = addresses.nextElement();
if ((address instanceof Inet4Address && ipVersion == NetworkProtocolVersion.IPV4) ||
(address instanceof Inet6Address && ipVersion == NetworkProtocolVersion.IPV6)) {
return address;
}
}
return null;
}
public static boolean interfaceSupports(NetworkInterface networkInterface, NetworkProtocolVersion ipVersion) {
boolean supportsVersion = false;
if (networkInterface != null) {
Enumeration<InetAddress> addresses = networkInterface.getInetAddresses();
while (addresses.hasMoreElements()) {
InetAddress address = addresses.nextElement();
if ((address instanceof Inet4Address && (ipVersion == NetworkProtocolVersion.IPV4)) ||
(address instanceof Inet6Address && (ipVersion == NetworkProtocolVersion.IPV6))) {
supportsVersion = true;
break;
}
}
}
return supportsVersion;
}
public static NetworkProtocolVersion getProtocolVersion() {
switch (findAvailableProtocols()) {
case IPV4:
return NetworkProtocolVersion.IPV4;
case IPV6:
return NetworkProtocolVersion.IPV6;
case IPV46:
if (Boolean.getBoolean(System.getProperty(IPV4_SETTING))) {
return NetworkProtocolVersion.IPV4;
}
if (Boolean.getBoolean(System.getProperty(IPV6_SETTING))) {
return NetworkProtocolVersion.IPV6;
}
return NetworkProtocolVersion.IPV6;
default:
break;
}
return NetworkProtocolVersion.NONE;
}
public static NetworkProtocolVersion findAvailableProtocols() {
boolean hasIPv4 = false;
boolean hasIPv6 = false;
for (InetAddress addr : getAllAvailableAddresses()) {
if (addr instanceof Inet4Address) {
hasIPv4 = true;
}
if (addr instanceof Inet6Address) {
hasIPv6 = true;
}
}
if (hasIPv4 && hasIPv6) {
return NetworkProtocolVersion.IPV46;
}
if (hasIPv4) {
return NetworkProtocolVersion.IPV4;
}
if (hasIPv6) {
return NetworkProtocolVersion.IPV6;
}
return NetworkProtocolVersion.NONE;
}
public static InetAddress resolveInetAddress(String hostname, String defaultValue) throws IOException {
String host = hostname;
if (host == null) {
host = defaultValue;
}
String origHost = host;
int pos = host.indexOf(':');
if (pos > 0) {
host = host.substring(0, pos - 1);
}
if ((host.startsWith("#") && host.endsWith("#")) || (host.startsWith("_") && host.endsWith("_"))) {
host = host.substring(1, host.length() - 1);
if ("local".equals(host)) {
return getLocalAddress();
} else if (host.startsWith("non_loopback")) {
if (host.toLowerCase(Locale.ROOT).endsWith(":ipv4")) {
return getFirstNonLoopbackAddress(NetworkProtocolVersion.IPV4);
} else if (host.toLowerCase(Locale.ROOT).endsWith(":ipv6")) {
return getFirstNonLoopbackAddress(NetworkProtocolVersion.IPV6);
} else {
return getFirstNonLoopbackAddress(getProtocolVersion());
}
} else {
NetworkProtocolVersion networkProtocolVersion = getProtocolVersion();
if (host.toLowerCase(Locale.ROOT).endsWith(":ipv4")) {
networkProtocolVersion = NetworkProtocolVersion.IPV4;
host = host.substring(0, host.length() - 5);
} else if (host.toLowerCase(Locale.ROOT).endsWith(":ipv6")) {
networkProtocolVersion = NetworkProtocolVersion.IPV6;
host = host.substring(0, host.length() - 5);
}
for (NetworkInterface ni : getInterfaces(NetworkUtils::isUp)) {
if (host.equals(ni.getName()) || host.equals(ni.getDisplayName())) {
if (ni.isLoopback()) {
return getFirstAddress(ni, networkProtocolVersion);
} else {
return getFirstNonLoopbackAddress(ni, networkProtocolVersion);
}
}
}
}
throw new IOException("failed to find network interface for [" + origHost + "]");
}
return InetAddress.getByName(host);
}
public static InetAddress resolvePublicHostAddress(String host) throws IOException {
InetAddress address = resolveInetAddress(host, null);
if (address == null || address.isAnyLocalAddress()) {
address = getFirstNonLoopbackAddress(NetworkProtocolVersion.IPV4);
if (address == null) {
address = getFirstNonLoopbackAddress(getProtocolVersion());
if (address == null) {
address = getLocalAddress();
if (address == null) {
return getLocalhost(NetworkProtocolVersion.IPV4);
}
}
}
}
return address;
}
private static List<NetworkInterface> getAllNetworkInterfaces() {
return getInterfaces(n -> true);
}
public static List<NetworkInterface> getAllRunningAndUpInterfaces() {
return getInterfaces(NetworkUtils::isUp);
}
public static List<NetworkInterface> getInterfaces(Predicate<NetworkInterface> predicate) {
List<NetworkInterface> networkInterfaces = new ArrayList<>();
Enumeration<NetworkInterface> interfaces;
try {
interfaces = NetworkInterface.getNetworkInterfaces();
} catch (Exception e) {
return networkInterfaces;
}
while (interfaces.hasMoreElements()) {
NetworkInterface networkInterface = interfaces.nextElement();
if (predicate.test(networkInterface)) {
networkInterfaces.add(networkInterface);
Enumeration<NetworkInterface> subInterfaces = networkInterface.getSubInterfaces();
if (subInterfaces.hasMoreElements()) {
while (subInterfaces.hasMoreElements()) {
networkInterfaces.add(subInterfaces.nextElement());
}
}
}
}
sortInterfaces(networkInterfaces);
return networkInterfaces;
}
public static List<InetAddress> getAllAvailableAddresses() {
List<InetAddress> allAddresses = new ArrayList<>();
for (NetworkInterface networkInterface : getAllNetworkInterfaces()) {
Enumeration<InetAddress> addrs = networkInterface.getInetAddresses();
while (addrs.hasMoreElements()) {
allAddresses.add(addrs.nextElement());
}
}
sortAddresses(allAddresses);
return allAddresses;
}
public static String displayNetworkInterfaces() {
StringBuilder sb = new StringBuilder();
for (NetworkInterface nic : getAllNetworkInterfaces()) {
sb.append(displayNetworkInterface(nic));
}
return sb.toString();
}
public static String displayNetworkInterface(NetworkInterface nic) {
StringBuilder sb = new StringBuilder();
sb.append(lf).append(nic.getName()).append(lf);
if (!nic.getName().equals(nic.getDisplayName())) {
sb.append("\t").append(nic.getDisplayName()).append(lf);
}
sb.append("\t").append("flags ");
List<String> flags = new ArrayList<>();
try {
if (nic.isUp()) {
flags.add("UP");
}
if (nic.supportsMulticast()) {
flags.add("MULTICAST");
}
if (nic.isLoopback()) {
flags.add("LOOPBACK");
}
if (nic.isPointToPoint()) {
flags.add("POINTTOPOINT");
}
if (nic.isVirtual()) {
flags.add("VIRTUAL");
}
} catch (Exception e) {
logger.log(Level.WARNING, e.getMessage(), e);
}
sb.append(String.join(",", flags));
try {
sb.append(" mtu ").append(nic.getMTU()).append(lf);
} catch (SocketException e) {
logger.log(Level.WARNING, e.getMessage(), e);
}
List<InterfaceAddress> addresses = nic.getInterfaceAddresses();
for (InterfaceAddress address : addresses) {
sb.append("\t").append(formatAddress(address)).append(lf);
}
try {
byte[] b = nic.getHardwareAddress();
if (b != null) {
sb.append("\t").append("ether ");
for (int i = 0; i < b.length; i++) {
if (i > 0) {
sb.append(":");
}
sb.append(hexDigit[(b[i] >> 4) & 0x0f]).append(hexDigit[b[i] & 0x0f]);
}
sb.append(lf);
}
} catch (SocketException e) {
logger.log(Level.WARNING, e.getMessage(), e);
}
return sb.toString();
}
private static void sortInterfaces(List<NetworkInterface> interfaces) {
interfaces.sort(Comparator.comparingInt(NetworkInterface::getIndex));
}
private static void sortAddresses(List<InetAddress> addressList) {
addressList.sort((o1, o2) -> compareBytes(o1.getAddress(), o2.getAddress()));
}
private static String formatAddress(InterfaceAddress interfaceAddress) {
StringBuilder sb = new StringBuilder();
InetAddress address = interfaceAddress.getAddress();
if (address instanceof Inet6Address) {
sb.append("inet6 ").append(format(address))
.append(" prefixlen:").append(interfaceAddress.getNetworkPrefixLength());
} else {
int netmask = 0xFFFFFFFF << (32 - interfaceAddress.getNetworkPrefixLength());
byte[] b = new byte[] { (byte)(netmask >>> 24), (byte)(netmask >>> 16 & 0xFF),
(byte)(netmask >>> 8 & 0xFF), (byte)(netmask & 0xFF) };
sb.append("inet ").append(format(address));
try {
sb.append(" netmask:").append(format(InetAddress.getByAddress(b)));
} catch (UnknownHostException e) {
logger.log(Level.WARNING, e.getMessage(), e);
}
InetAddress broadcast = interfaceAddress.getBroadcast();
if (broadcast != null) {
sb.append(" broadcast:").append(format(broadcast));
}
}
if (address.isLoopbackAddress()) {
sb.append(" scope:host");
} else if (address.isLinkLocalAddress()) {
sb.append(" scope:link");
} else if (address.isSiteLocalAddress()) {
sb.append(" scope:site");
}
return sb.toString();
}
private static boolean isUp(NetworkInterface networkInterface) {
try {
return networkInterface.isUp();
} catch (SocketException e) {
return false;
}
}
private static int compareBytes(byte[] left, byte[] right) {
for (int i = 0, j = 0; i < left.length && j < right.length; i++, j++) {
int a = left[i] & 0xff;
int b = right[j] & 0xff;
if (a != b) {
return a - b;
}
}
return left.length - right.length;
}
private static void compressLongestRunOfZeroes(int[] hextets) {
int bestRunStart = -1;
int bestRunLength = -1;
int runStart = -1;
for (int i = 0; i < hextets.length + 1; i++) {
if (i < hextets.length && hextets[i] == 0) {
if (runStart < 0) {
runStart = i;
}
} else if (runStart >= 0) {
int runLength = i - runStart;
if (runLength > bestRunLength) {
bestRunStart = runStart;
bestRunLength = runLength;
}
runStart = -1;
}
}
if (bestRunLength >= 2) {
Arrays.fill(hextets, bestRunStart, bestRunStart + bestRunLength, -1);
}
}
private static String hextetsToIPv6String(int[] hextets) {
StringBuilder sb = new StringBuilder(39);
boolean lastWasNumber = false;
for (int i = 0; i < hextets.length; i++) {
boolean b = hextets[i] >= 0;
if (b) {
if (lastWasNumber) {
sb.append(':');
}
sb.append(Integer.toHexString(hextets[i]));
} else {
if (i == 0 || lastWasNumber) {
sb.append("::");
}
}
lastWasNumber = b;
}
return sb.toString();
}
}

View file

@ -0,0 +1,4 @@
/**
* Utilities for Netty HTTP client.
*/
package org.xbib.netty.http.client.util;

View file

@ -33,10 +33,9 @@ public class AkamaiTest {
private static final Logger logger = Logger.getLogger("");
@Test
public void testAkamai() throws Exception {
public void testAkamaiHttps() throws Exception {
// here we can not deal with server PUSH_PROMISE as response to headers, a go-away frame is written.
// Probably because promised stream id is 2 which is smaller than 3?
// here we see server PUSH_PROMISE as response to headers, a go-away frame is written.
/*
----------------INBOUND--------------------
@ -59,11 +58,17 @@ public class AkamaiTest {
httpClient.prepareGet()
.setVersion("HTTP/2.0")
.setURL("https://http2.akamai.com/demo/h2_demo_frame.html")
.onError(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.onException(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.onResponse(fullHttpResponse -> {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response);
})
.onPushReceived((headers, fullHttpResponse) -> {
logger.log(Level.INFO, "received push promise: request headers = " + headers
+ " status = " + fullHttpResponse.status()
+ " response headers = " + fullHttpResponse.headers().entries()
);
})
.execute()
.get();
httpClient.close();

View file

@ -15,7 +15,6 @@
*/
package org.xbib.netty.http.client.test;
import org.junit.Ignore;
import org.junit.Test;
import org.xbib.netty.http.client.HttpClient;
import org.xbib.netty.http.client.HttpRequestBuilder;
@ -64,7 +63,7 @@ public class ElasticsearchTest {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response);
})
.onError(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.onException(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.execute()
.get();
httpClient.close();
@ -82,7 +81,7 @@ public class ElasticsearchTest {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response);
})
.onError(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.onException(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.execute()
.get();
httpClient.close();
@ -122,6 +121,6 @@ public class ElasticsearchTest {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response);
})
.onError(e -> logger.log(Level.SEVERE, e.getMessage(), e));
.onException(e -> logger.log(Level.SEVERE, e.getMessage(), e));
}
}

View file

@ -60,7 +60,7 @@ public class ExceptionTest {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response);
})
.onError(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.onException(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.execute()
.get();
httpClient.close();

View file

@ -1,6 +1,5 @@
package org.xbib.netty.http.client.test;
import org.junit.Ignore;
import org.junit.Test;
import org.xbib.netty.http.client.HttpClient;
import org.xbib.netty.http.client.HttpRequestBuilder;
@ -41,7 +40,8 @@ public class GoogleTest {
.build();
httpClient.prepareGet()
.setURL("http://www.google.com")
.onError(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.onException(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.onHeaders(headers -> logger.log(Level.INFO, headers.toString()))
.onResponse(fullHttpResponse -> {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response);
@ -51,7 +51,7 @@ public class GoogleTest {
httpClient.close();
}
@Test
public void testGoogleWithoutFollowRedirects() throws Exception {
HttpClient httpClient = HttpClient.builder()
.build();
@ -68,13 +68,14 @@ public class GoogleTest {
httpClient.close();
}
@Test
public void testGoogleHttps1() throws Exception {
HttpClient httpClient = HttpClient.builder()
.build();
httpClient.prepareGet()
.setURL("https://www.google.com")
.onError(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.onException(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.onResponse(fullHttpResponse -> {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response);
@ -92,7 +93,7 @@ public class GoogleTest {
httpClient.prepareGet()
.setVersion("HTTP/2.0")
.setURL("https://www.google.com")
.onError(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.onException(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.onResponse(fullHttpResponse -> {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response);
@ -110,8 +111,7 @@ public class GoogleTest {
HttpRequestBuilder builder1 = httpClient.prepareGet()
.setVersion("HTTP/2.0")
.setURL("https://www.google.com")
.setTimeout(10000)
.onError(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.onException(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.onResponse(fullHttpResponse -> {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response);
@ -120,19 +120,15 @@ public class GoogleTest {
HttpRequestBuilder builder2 = httpClient.prepareGet()
.setVersion("HTTP/2.0")
.setURL("https://www.google.com")
.setTimeout(10000)
.onError(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.onException(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.onResponse(fullHttpResponse -> {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response);
});
// only sequential ... this sucks.
HttpRequestContext context1 = builder1.execute();
context1.get();
HttpRequestContext context2 = builder2.execute();
context1.get();
context2.get();
httpClient.close();

View file

@ -68,7 +68,8 @@ public class Http2FrameAdapterTest {
@Test
public void testHttp2FrameAdapter() throws Exception {
final int serverExpectedDataFrames = 1;
final InetSocketAddress inetSocketAddress = new InetSocketAddress("http2-push.io", 443);
//final InetSocketAddress inetSocketAddress = new InetSocketAddress("http2-push.io", 443);
final InetSocketAddress inetSocketAddress = new InetSocketAddress("webtide.com", 443);
final CountDownLatch dataLatch = new CountDownLatch(serverExpectedDataFrames);
EventLoopGroup group = new NioEventLoopGroup();
Channel clientChannel = null;

View file

@ -40,7 +40,7 @@ public class Http2PushioTest {
httpClient.prepareGet()
.setVersion("HTTP/2.0")
.setURL("https://http2-push.io")
.onError(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.onException(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.onResponse(fullHttpResponse -> {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response);

View file

@ -0,0 +1,64 @@
package org.xbib.netty.http.client.test;
import org.junit.Test;
import org.xbib.netty.http.client.HttpClient;
import java.nio.charset.StandardCharsets;
import java.util.logging.ConsoleHandler;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.LogManager;
import java.util.logging.Logger;
import java.util.logging.SimpleFormatter;
/**
*/
public class HttpBinTest {
static {
System.setProperty("java.util.logging.SimpleFormatter.format",
"%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS.%1$tL %4$-7s [%3$s] %2$s %5$s %6$s%n");
LogManager.getLogManager().reset();
Logger rootLogger = LogManager.getLogManager().getLogger("");
Handler handler = new ConsoleHandler();
handler.setFormatter(new SimpleFormatter());
rootLogger.addHandler(handler);
rootLogger.setLevel(Level.ALL);
for (Handler h : rootLogger.getHandlers()) {
handler.setFormatter(new SimpleFormatter());
h.setLevel(Level.ALL);
}
}
private static final Logger logger = Logger.getLogger("");
/**
* The reponse body should be
* <pre>
* {
* "cookies": {
* "name": "value"
* }
* }
* </pre>
* @throws Exception
*/
@Test
public void testHttpBin() throws Exception {
HttpClient httpClient = HttpClient.builder()
.build();
httpClient.prepareGet()
.setURL("http://httpbin.org/cookies/set?name=value")
.onException(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.onCookie(cookie -> logger.log(Level.INFO, cookie.toString()))
.onHeaders(headers -> logger.log(Level.INFO, headers.toString()))
.onResponse(fullHttpResponse -> {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response);
})
.execute()
.get();
httpClient.close();
}
}

View file

@ -1,306 +0,0 @@
package org.xbib.netty.http.client.test;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.ChannelInputShutdownReadComplete;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DelegatingDecompressorFrameListener;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2ConnectionPrefaceWrittenEvent;
import io.netty.handler.codec.http2.Http2FrameLogger;
import io.netty.handler.codec.http2.Http2SecurityUtil;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.codec.http2.HttpConversionUtil;
import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandler;
import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandlerBuilder;
import io.netty.handler.codec.http2.InboundHttp2ToHttpAdapterBuilder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.ApplicationProtocolConfig;
import io.netty.handler.ssl.ApplicationProtocolNames;
import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler;
import io.netty.handler.ssl.SslCloseCompletionEvent;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.SslProvider;
import io.netty.handler.ssl.SupportedCipherSuiteFilter;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.util.internal.PlatformDependent;
import org.junit.Test;
import javax.net.ssl.SNIHostName;
import javax.net.ssl.SNIServerName;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLParameters;
import java.net.InetSocketAddress;
import java.net.URL;
import java.util.AbstractMap;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.logging.ConsoleHandler;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.LogManager;
import java.util.logging.Logger;
import java.util.logging.SimpleFormatter;
/**
*/
public class InboundHttp2ToHttpAdapterTest {
static {
System.setProperty("java.util.logging.SimpleFormatter.format",
"%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS.%1$tL %4$-7s [%3$s] %2$s %5$s %6$s%n");
LogManager.getLogManager().reset();
Logger rootLogger = LogManager.getLogManager().getLogger("");
Handler handler = new ConsoleHandler();
handler.setFormatter(new SimpleFormatter());
rootLogger.addHandler(handler);
rootLogger.setLevel(Level.ALL);
for (Handler h : rootLogger.getHandlers()) {
handler.setFormatter(new SimpleFormatter());
h.setLevel(Level.ALL);
}
}
private static final Logger logger = Logger.getLogger("");
@Test
public void testInboundHttp2ToHttpAdapter() throws Exception {
URL url = new URL("https://http2-push.io");
final InetSocketAddress inetSocketAddress = new InetSocketAddress(url.getHost(), 443);
EventLoopGroup group = new NioEventLoopGroup();
Channel clientChannel = null;
SettingsHandler settingsHandler = new SettingsHandler();
ResponseHandler responseHandler = new ResponseHandler();
try {
Bootstrap bs = new Bootstrap();
bs.group(group);
bs.channel(NioSocketChannel.class);
bs.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new TrafficLoggingHandler());
SslContext sslContext = SslContextBuilder.forClient()
.sslProvider(SslProvider.OPENSSL)
.trustManager(InsecureTrustManagerFactory.INSTANCE)
.ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE)
.applicationProtocolConfig(new ApplicationProtocolConfig(
ApplicationProtocolConfig.Protocol.ALPN,
ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE,
ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT,
ApplicationProtocolNames.HTTP_2))
.build();
SslHandler sslHandler = sslContext.newHandler(ch.alloc());
SSLEngine engine = sslHandler.engine();
String fullQualifiedHostname = inetSocketAddress.getHostName();
SSLParameters params = engine.getSSLParameters();
params.setServerNames(Arrays.asList(new SNIServerName[]{new SNIHostName(fullQualifiedHostname)}));
engine.setSSLParameters(params);
ch.pipeline().addLast(sslHandler);
ch.pipeline().addLast(new Http2NegotiationHandler(settingsHandler, responseHandler));
}
});
clientChannel = bs.connect(inetSocketAddress).syncUninterruptibly().channel();
settingsHandler.awaitSettings(clientChannel.newPromise());
HttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.valueOf("HTTP/2.0"),
HttpMethod.GET, url.toExternalForm());
logger.log(Level.FINE, "HTTP2: sending request");
responseHandler.put(3, clientChannel.write(httpRequest), clientChannel.newPromise());
clientChannel.flush();
logger.log(Level.FINE, "HTTP2: waiting for responses");
responseHandler.awaitResponses();
logger.log(Level.FINE, "HTTP2: done");
} finally {
if (clientChannel != null) {
clientChannel.close();
}
group.shutdownGracefully();
}
}
private HttpToHttp2ConnectionHandler createHttp2ConnectionHandler() {
final Http2Connection http2Connection = new DefaultHttp2Connection(false);
return new HttpToHttp2ConnectionHandlerBuilder()
.connection(http2Connection)
.frameLogger(new Http2FrameLogger(LogLevel.INFO, "client"))
.frameListener(new DelegatingDecompressorFrameListener(http2Connection,
new InboundHttp2ToHttpAdapterBuilder(http2Connection)
.maxContentLength(10 * 1024 * 1024)
.propagateSettings(true)
.validateHttpHeaders(false)
.build()))
.build();
}
class Http2NegotiationHandler extends ApplicationProtocolNegotiationHandler {
private final SettingsHandler settingsHandler;
private final ResponseHandler responseHandler;
Http2NegotiationHandler(SettingsHandler settingsHandler, ResponseHandler responseHandler) {
super("");
this.settingsHandler = settingsHandler;
this.responseHandler = responseHandler;
}
@Override
protected void configurePipeline(ChannelHandlerContext ctx, String protocol) throws Exception {
if (ApplicationProtocolNames.HTTP_2.equals(protocol)) {
ctx.pipeline().addLast(createHttp2ConnectionHandler());
ctx.pipeline().addLast(settingsHandler);
ctx.pipeline().addLast(new UserEventLogger());
ctx.pipeline().addLast(responseHandler);
logger.log(Level.FINE, "negotiated HTTP/2: pipeline = " + ctx.pipeline().names());
return;
}
ctx.close();
throw new IllegalStateException("unexpected protocol: " + protocol);
}
}
class SettingsHandler extends SimpleChannelInboundHandler<Http2Settings> {
private ChannelPromise promise;
@Override
protected void channelRead0(ChannelHandlerContext ctx, Http2Settings msg) throws Exception {
promise.setSuccess();
ctx.pipeline().remove(this);
}
void awaitSettings(ChannelPromise promise) throws Exception {
this.promise = promise;
int timeout = 5000;
if (!promise.awaitUninterruptibly(timeout, TimeUnit.MILLISECONDS)) {
throw new IllegalStateException("time out while waiting for HTTP/2 settings");
}
if (!promise.isSuccess()) {
throw new RuntimeException(promise.cause());
}
}
}
class ResponseHandler extends SimpleChannelInboundHandler<FullHttpResponse> {
private final Map<Integer, Map.Entry<ChannelFuture, ChannelPromise>> streamidPromiseMap;
ResponseHandler() {
this.streamidPromiseMap = PlatformDependent.newConcurrentHashMap();
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse httpResponse) throws Exception {
Integer streamId = httpResponse.headers().getInt(HttpConversionUtil.ExtensionHeaderNames.STREAM_ID.text());
if (streamId == null) {
logger.log(Level.WARNING, () -> "stream ID missing");
return;
}
Map.Entry<ChannelFuture, ChannelPromise> entry = streamidPromiseMap.get(streamId);
if (entry != null) {
entry.getValue().setSuccess();
} else {
logger.log(Level.WARNING, () -> "stream id not found in promise map: " + streamId);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.log(Level.FINE, () -> "exception caught " + cause.getMessage());
}
void put(int streamId, ChannelFuture channelFuture, ChannelPromise promise) {
logger.log(Level.FINE, () -> "put stream ID " + streamId);
streamidPromiseMap.put(streamId, new AbstractMap.SimpleEntry<>(channelFuture, promise));
}
void awaitResponses() {
int timeout = 5000;
Iterator<Map.Entry<Integer, Map.Entry<ChannelFuture, ChannelPromise>>> iterator = streamidPromiseMap.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<Integer, Map.Entry<ChannelFuture, ChannelPromise>> entry = iterator.next();
ChannelFuture channelFuture = entry.getValue().getKey();
if (!channelFuture.awaitUninterruptibly(timeout, TimeUnit.MILLISECONDS)) {
throw new IllegalStateException("time out while waiting to write for stream id " + entry.getKey());
}
if (!channelFuture.isSuccess()) {
throw new RuntimeException(channelFuture.cause());
}
ChannelPromise promise = entry.getValue().getValue();
if (!promise.awaitUninterruptibly(timeout, TimeUnit.MILLISECONDS)) {
throw new IllegalStateException("time out while waiting for response on stream id " + entry.getKey());
}
if (!promise.isSuccess()) {
throw new RuntimeException(promise.cause());
}
iterator.remove();
}
}
}
class UserEventLogger extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
logger.log(Level.FINE, () -> "got user event " + evt);
if (evt instanceof Http2ConnectionPrefaceWrittenEvent ||
evt instanceof SslCloseCompletionEvent ||
evt instanceof ChannelInputShutdownReadComplete) {
// Expected events
logger.log(Level.FINE, () -> "user event is expected: " + evt);
return;
}
super.userEventTriggered(ctx, evt);
}
}
class TrafficLoggingHandler extends LoggingHandler {
TrafficLoggingHandler() {
super("client", LogLevel.TRACE);
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelRegistered();
}
@Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelUnregistered();
}
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof ByteBuf && !((ByteBuf) msg).isReadable()) {
ctx.write(msg, promise);
} else {
super.write(ctx, msg, promise);
}
}
}
}

View file

@ -16,7 +16,6 @@
package org.xbib.netty.http.client.test;
import io.netty.handler.codec.http.FullHttpResponse;
import org.junit.Ignore;
import org.junit.Test;
import org.xbib.netty.http.client.HttpClient;
import org.xbib.netty.http.client.HttpRequestBuilder;
@ -53,14 +52,13 @@ public class IndexHbzTest {
private static final Logger logger = Logger.getLogger("");
@Test
public void testIndexHbz() throws Exception {
HttpClient httpClient = HttpClient.builder()
.build();
httpClient.prepareGet()
.setVersion("HTTP/1.1")
.setURL("http://index.hbz-nrw.de")
.onError(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.onException(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.onResponse(fullHttpResponse -> {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response);
@ -77,7 +75,7 @@ public class IndexHbzTest {
httpClient.prepareGet()
.setVersion("HTTP/1.1")
.setURL("https://index.hbz-nrw.de")
.onError(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.onException(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.onResponse(fullHttpResponse -> {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response);
@ -123,7 +121,7 @@ public class IndexHbzTest {
.setVersion("HTTP/2.0")
.setURL("https://index.hbz-nrw.de")
.setTimeout(5000)
.onError(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.onException(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.onResponse(fullHttpResponse -> {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response);
@ -133,7 +131,6 @@ public class IndexHbzTest {
httpClient.close();
}
@Test
public void testIndexHbzH2C() throws Exception {
// times out waiting for http2 settings frame
@ -144,7 +141,7 @@ public class IndexHbzTest {
httpClient.prepareGet()
.setVersion("HTTP/2.0")
.setURL("http://index.hbz-nrw.de")
.onError(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.onException(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.onResponse(fullHttpResponse -> {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response);
@ -155,7 +152,7 @@ public class IndexHbzTest {
}
@Test
public void testIndexHbzConcurrent() throws Exception {
public void testIndexHbzConcurrentHttp1() throws Exception {
HttpClient httpClient = HttpClient.builder()
.build();
@ -163,7 +160,7 @@ public class IndexHbzTest {
HttpRequestBuilder builder1 = httpClient.prepareGet()
.setVersion("HTTP/1.1")
.setURL("http://index.hbz-nrw.de")
.onError(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.onException(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.onResponse(fullHttpResponse -> {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response);
@ -172,7 +169,7 @@ public class IndexHbzTest {
HttpRequestBuilder builder2 = httpClient.prepareGet()
.setVersion("HTTP/1.1")
.setURL("http://index.hbz-nrw.de")
.onError(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.onException(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.onResponse(fullHttpResponse -> {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response);

View file

@ -0,0 +1,60 @@
package org.xbib.netty.http.client.test;
import org.junit.Test;
import org.xbib.netty.http.client.HttpClient;
import java.nio.charset.StandardCharsets;
import java.util.logging.ConsoleHandler;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.LogManager;
import java.util.logging.Logger;
import java.util.logging.SimpleFormatter;
/**
*/
public class WebtideTest {
static {
System.setProperty("java.util.logging.SimpleFormatter.format",
"%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS.%1$tL %4$-7s [%3$s] %2$s %5$s %6$s%n");
LogManager.getLogManager().reset();
Logger rootLogger = LogManager.getLogManager().getLogger("");
Handler handler = new ConsoleHandler();
handler.setFormatter(new SimpleFormatter());
rootLogger.addHandler(handler);
rootLogger.setLevel(Level.FINE);
for (Handler h : rootLogger.getHandlers()) {
handler.setFormatter(new SimpleFormatter());
h.setLevel(Level.FINE);
}
}
private static final Logger logger = Logger.getLogger("");
@Test
public void testWebtide() throws Exception {
HttpClient httpClient = HttpClient.builder()
.build();
httpClient.prepareGet()
.setVersion("HTTP/2.0")
.setURL("https://webtide.com")
.onException(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.onResponse(fullHttpResponse -> {
logger.log(Level.INFO, "status = " + fullHttpResponse.status()
+ " response headers = " + fullHttpResponse.headers().entries()
);
})
.onPushReceived((headers, fullHttpResponse) -> {
logger.log(Level.INFO, "received push promise: request headers = " + headers
+ " status = " + fullHttpResponse.status()
+ " response headers = " + fullHttpResponse.headers().entries()
);
})
.execute()
.get();
httpClient.close();
}
}

View file

@ -92,6 +92,7 @@ public class XbibTest {
}
@Test
@Ignore
public void testXbibOrgWithProxy() throws Exception {
HttpClient httpClient = HttpClient.builder()
.setHttpProxyHandler(new InetSocketAddress("80.241.223.251", 8080))
@ -104,7 +105,7 @@ public class XbibTest {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response);
})
.onError(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.onException(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.execute()
.get();
httpClient.close();
@ -122,7 +123,7 @@ public class XbibTest {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response);
})
.onError(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.onException(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.execute()
.get();
httpClient.close();
@ -137,7 +138,7 @@ public class XbibTest {
httpClient.prepareGet()
.setVersion("HTTP/1.1")
.setURL("http://xbib.org")
.onError(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.onException(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.onResponse(fullHttpResponse -> {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response);
@ -148,7 +149,7 @@ public class XbibTest {
httpClient.prepareGet()
.setVersion("HTTP/1.1")
.setURL("http://xbib.org")
.onError(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.onException(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.onResponse(fullHttpResponse -> {
String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response);