replace latch with future

This commit is contained in:
Jörg Prante 2017-05-30 00:24:22 +02:00
parent 1765953a36
commit 7af00b6869
11 changed files with 461 additions and 70 deletions

View file

@ -1,6 +1,6 @@
group = org.xbib
name = netty-http-client
version = 4.1.11.2
version = 4.1.11.3
netty.version = 4.1.11.Final
tcnative.version = 2.0.1.Final

View file

@ -50,6 +50,7 @@ import org.xbib.netty.http.client.util.NetworkUtils;
import java.io.Closeable;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.ConnectException;
import java.net.URI;
import java.net.URLDecoder;
import java.util.Collection;
@ -70,6 +71,13 @@ public final class HttpClient implements Closeable {
private static final HttpClient INSTANCE = HttpClient.builder().build();
static {
NetworkUtils.extendSystemProperties();
logger.log(Level.FINE, () -> "OpenSSL ALPN support: " + OpenSsl.isAlpnSupported());
logger.log(Level.FINE, () -> "local host name = " + NetworkUtils.getLocalHostName("localhost"));
logger.log(Level.FINE, NetworkUtils::displayNetworkInterfaces);
}
private final ByteBufAllocator byteBufAllocator;
private final EventLoopGroup eventLoopGroup;
@ -87,10 +95,6 @@ public final class HttpClient implements Closeable {
this.byteBufAllocator = byteBufAllocator;
this.eventLoopGroup = eventLoopGroup;
this.poolMap = new HttpClientChannelPoolMap(this, httpClientChannelContext, bootstrap, maxConnections);
logger.log(Level.FINE, () -> "OpenSSL ALPN support: " + OpenSsl.isAlpnSupported());
NetworkUtils.extendSystemProperties();
logger.log(Level.FINE, () -> "local host name = " + NetworkUtils.getLocalHostName("localhost"));
logger.log(Level.FINE, NetworkUtils::displayNetworkInterfaces);
}
public static HttpClient getInstance() {
@ -225,7 +229,7 @@ public final class HttpClient implements Closeable {
logger.log(Level.FINE, () -> "closing pool map");
poolMap.close();
logger.log(Level.FINE, () -> "closing event loop group");
if (!eventLoopGroup.isTerminated()) {
if (!eventLoopGroup.isShuttingDown()) {
eventLoopGroup.shutdownGracefully();
}
logger.log(Level.FINE, () -> "closed");
@ -360,7 +364,7 @@ public final class HttpClient implements Closeable {
if (exceptionListener != null) {
exceptionListener.onException(future.cause());
}
httpRequestContext.fail("channel pool failure");
httpRequestContext.fail(new ConnectException("unable to connect to " + inetAddressKey));
}
});
}

View file

@ -52,8 +52,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.logging.Level;
@ -102,6 +100,8 @@ public class HttpClientRequestBuilder implements HttpRequestBuilder, HttpRequest
private HttpRequest httpRequest;
private HttpRequestFuture<String> httpRequestFuture = DEFAULT_FUTURE;
private HttpRequestContext httpRequestContext;
private HttpResponseListener httpResponseListener;
@ -141,6 +141,11 @@ public class HttpClientRequestBuilder implements HttpRequestBuilder, HttpRequest
return new HttpClientRequestBuilder(httpMethod, UnpooledByteBufAllocator.DEFAULT, 3);
}
public HttpRequestBuilder withFuture(HttpRequestFuture<String> httpRequestFuture) {
this.httpRequestFuture = httpRequestFuture;
return this;
}
@Override
public HttpRequestBuilder setHttp1() {
this.httpVersion = HttpVersion.HTTP_1_1;
@ -377,12 +382,11 @@ public class HttpClientRequestBuilder implements HttpRequestBuilder, HttpRequest
if (httpResponseListener == null) {
httpResponseListener = httpRequestContext;
}
httpRequestContext = new HttpRequestContext(uri, httpRequest, streamId,
new AtomicBoolean(false),
new AtomicBoolean(false),
httpRequestContext = new HttpRequestContext(uri, httpRequest,
httpRequestFuture,
streamId,
timeout, System.currentTimeMillis(),
followRedirect, maxRedirects, new AtomicInteger(0),
new CountDownLatch(1),
httpResponseListener,
exceptionListener,
httpHeadersListener,

View file

@ -34,9 +34,9 @@ import java.util.AbstractMap;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -53,9 +53,7 @@ public final class HttpRequestContext implements HttpResponseListener, HttpReque
private final HttpRequest httpRequest;
private final AtomicBoolean succeeded;
private final AtomicBoolean failed;
private final HttpRequestFuture<String> httpRequestFuture;
private final boolean followRedirect;
@ -67,8 +65,6 @@ public final class HttpRequestContext implements HttpResponseListener, HttpReque
private final Long startTime;
private final CountDownLatch latch;
private final AtomicInteger streamId;
private final HttpResponseListener httpResponseListener;
@ -91,15 +87,13 @@ public final class HttpRequestContext implements HttpResponseListener, HttpReque
private Map<Integer, FullHttpResponse> httpResponses;
private boolean hastimeout;
private Long stopTime;
HttpRequestContext(URI uri, HttpRequest httpRequest, AtomicInteger streamId,
AtomicBoolean succeeded, AtomicBoolean failed,
HttpRequestContext(URI uri, HttpRequest httpRequest,
HttpRequestFuture<String> httpRequestFuture,
AtomicInteger streamId,
int timeout, Long startTime,
boolean followRedirect, int maxRedirects, AtomicInteger redirectCount,
CountDownLatch latch,
HttpResponseListener httpResponseListener,
ExceptionListener exceptionListener,
HttpHeadersListener httpHeadersListener,
@ -107,15 +101,13 @@ public final class HttpRequestContext implements HttpResponseListener, HttpReque
HttpPushListener httpPushListener) {
this.uri = uri;
this.httpRequest = httpRequest;
this.httpRequestFuture = httpRequestFuture;
this.streamId = streamId;
this.succeeded = succeeded;
this.failed = failed;
this.timeout = timeout;
this.startTime = startTime;
this.followRedirect = followRedirect;
this.maxRedirects = maxRedirects;
this.redirectCount = redirectCount;
this.latch = latch;
this.httpResponseListener = httpResponseListener;
this.exceptionListener = exceptionListener;
this.httpHeadersListener = httpHeadersListener;
@ -133,16 +125,13 @@ public final class HttpRequestContext implements HttpResponseListener, HttpReque
HttpRequestContext(URI uri, HttpRequest httpRequest, HttpRequestContext httpRequestContext) {
this.uri = uri;
this.httpRequest = httpRequest;
this.httpRequestFuture = httpRequestContext.httpRequestFuture;
this.streamId = httpRequestContext.streamId;
this.succeeded = httpRequestContext.succeeded;
this.failed = httpRequestContext.failed;
this.failed.lazySet(false); // reset
this.timeout = httpRequestContext.timeout;
this.startTime = httpRequestContext.startTime;
this.followRedirect = httpRequestContext.followRedirect;
this.maxRedirects = httpRequestContext.maxRedirects;
this.redirectCount = httpRequestContext.redirectCount;
this.latch = httpRequestContext.latch;
this.httpResponseListener = httpRequestContext.httpResponseListener;
this.exceptionListener = httpRequestContext.exceptionListener;
this.httpHeadersListener = httpRequestContext.httpHeadersListener;
@ -247,11 +236,11 @@ public final class HttpRequestContext implements HttpResponseListener, HttpReque
}
public boolean isSucceeded() {
return succeeded.get();
return httpRequestFuture.isSucceeded();
}
public boolean isFailed() {
return failed.get();
return httpRequestFuture.isFailed();
}
public boolean isFollowRedirect() {
@ -278,44 +267,36 @@ public final class HttpRequestContext implements HttpResponseListener, HttpReque
return (startTime + timeout) - System.currentTimeMillis();
}
public CountDownLatch getLatch() {
return latch;
}
public AtomicInteger getStreamId() {
return streamId;
}
public HttpRequestContext get() throws InterruptedException {
return waitFor(DEFAULT_TIMEOUT_MILLIS, TimeUnit.SECONDS);
public HttpRequestContext get() throws InterruptedException, TimeoutException, ExecutionException {
return get(DEFAULT_TIMEOUT_MILLIS, TimeUnit.SECONDS);
}
public HttpRequestContext waitFor(long value, TimeUnit timeUnit) throws InterruptedException {
this.hastimeout = latch.await(value, timeUnit);
public HttpRequestContext get(long timeout, TimeUnit timeUnit)
throws InterruptedException, TimeoutException, ExecutionException {
httpRequestFuture.get(timeout, timeUnit);
stopTime = System.currentTimeMillis();
return this;
}
public boolean isTimeout() {
return hastimeout;
}
public void success(String reason) {
logger.log(Level.FINE, () -> "success because of " + reason);
if (succeeded.compareAndSet(false, true)) {
latch.countDown();
}
httpRequestFuture.success(reason);
}
public void fail(String reason) {
logger.log(Level.FINE, () -> "failed because of " + reason);
IllegalStateException exception = new IllegalStateException(reason);
fail(new IllegalStateException(reason));
}
public void fail(Exception exception) {
logger.log(Level.FINE, () -> "failed because of " + exception.getMessage());
if (exceptionListener != null) {
exceptionListener.onException(exception);
}
if (failed.compareAndSet(false, true)) {
latch.countDown();
}
httpRequestFuture.fail(exception);
}
@Override

View file

@ -37,4 +37,6 @@ public interface HttpRequestDefaults {
int DEFAULT_TIMEOUT_MILLIS = 5000;
int DEFAULT_MAX_REDIRECT = 10;
HttpRequestFuture<String> DEFAULT_FUTURE = new HttpRequestFuture<>();
}

View file

@ -0,0 +1,20 @@
package org.xbib.netty.http.client;
import org.xbib.netty.http.client.util.AbstractFuture;
/**
* A HTTP request future.
*
* @param <V> the response type parameter.
*/
public class HttpRequestFuture<V> extends AbstractFuture<V> {
public void success(V v) {
set(v);
}
public void fail(Exception e) {
setException(e);
}
}

View file

@ -142,9 +142,9 @@ public class HttpClientChannelInitializer extends ChannelInitializer<SocketChann
new Http2ClientUpgradeCodec(http2connectionHandler);
HttpClientUpgradeHandler upgradeHandler =
new HttpClientUpgradeHandler(http1connectionHandler, upgradeCodec, context.getMaxContentLength());
pipeline.addLast(upgradeHandler);
UpgradeRequestHandler upgradeRequestHandler =
new UpgradeRequestHandler();
pipeline.addLast(upgradeHandler);
pipeline.addLast(upgradeRequestHandler);
} else {
pipeline.addLast(http2connectionHandler);

View file

@ -0,0 +1,353 @@
package org.xbib.netty.http.client.util;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
/**
* <p>
* An abstract implementation of the {@link Future} interface. This class
* is an abstraction of {@link java.util.concurrent.FutureTask} to support use
* for tasks other than {@link Runnable}s. It uses an
* {@link AbstractQueuedSynchronizer} to deal with concurrency issues and
* guarantee thread safety. It could be used as a base class to
* {@code FutureTask}, or any other implementor of the {@code Future} interface.
* </p>
*
* <p>
* This class implements all methods in {@code Future}. Subclasses should
* provide a way to set the result of the computation through the protected
* methods {@link #set(Object)}, {@link #setException(Exception)}, or
* {@link #cancel()}. If subclasses want to implement cancellation they can
* override the {@link #cancel(boolean)} method with a real implementation, the
* default implementation doesn't support cancellation.
* </p>
*
* <p>
* The state changing methods all return a boolean indicating success or
* failure in changing the future's state. Valid states are running,
* completed, failed, or cancelled. Because this class does not implement
* cancellation it is left to the subclass to distinguish between created
* and running tasks.
* </p>
*
* <p>This class is taken from the Google Guava project.</p>
*
* @param <V> the future value parameter type
*/
public abstract class AbstractFuture<V> implements Future<V> {
/**
* Synchronization control.
*/
private final Sync<V> sync = new Sync<>();
/**
* The default {@link AbstractFuture} implementation throws {@code
* InterruptedException} if the current thread is interrupted before or during
* the call, even if the value is already available.
*
* @throws InterruptedException if the current thread was interrupted before
* or during the call (optional but recommended).
* @throws TimeoutException if operation timed out
* @throws ExecutionException if execution fails
*/
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException,
TimeoutException, ExecutionException {
return sync.get(unit.toNanos(timeout));
}
/**
* The default {@link AbstractFuture} implementation throws {@code
* InterruptedException} if the current thread is interrupted before or during
* the call, even if the value is already available.
*
* @throws InterruptedException if the current thread was interrupted before
* or during the call (optional but recommended).
* @throws ExecutionException if execution fails
*/
@Override
public V get() throws InterruptedException, ExecutionException {
return sync.get();
}
/**
* Checks if the sync is not in the running state.
*/
@Override
public boolean isDone() {
return sync.isDone();
}
/**
* Checks if the sync is in the cancelled state.
*/
@Override
public boolean isCancelled() {
return sync.isCancelled();
}
public boolean isSucceeded() {
return sync.isSuccess();
}
public boolean isFailed() {
return sync.isFailed();
}
/**
* Default implementation of cancel that cancels the future.
*/
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (!sync.cancel()) {
return false;
}
done();
if (mayInterruptIfRunning) {
interruptTask();
}
return true;
}
/**
* Subclasses should invoke this method to set the result of the computation
* to {@code value}. This will set the state of the future to
* {@link AbstractFuture.Sync#COMPLETED} and call {@link #done()} if the
* state was successfully changed.
*
* @param value the value that was the result of the task.
* @return true if the state was successfully changed.
*/
protected boolean set(V value) {
boolean result = sync.set(value);
if (result) {
done();
}
return result;
}
/**
* Subclasses should invoke this method to set the result of the computation
* to an error, {@code throwable}. This will set the state of the future to
* {@link AbstractFuture.Sync#COMPLETED} and call {@link #done()} if the
* state was successfully changed.
*
* @param exception the exception that the task failed with.
* @return true if the state was successfully changed.
*/
protected boolean setException(Exception exception) {
boolean result = sync.setException(exception);
if (result) {
done();
}
return result;
}
/**
* Subclasses should invoke this method to mark the future as cancelled.
* This will set the state of the future to {@link
* AbstractFuture.Sync#CANCELLED} and call {@link #done()} if the state was
* successfully changed.
*
* @return true if the state was successfully changed.
*/
protected final boolean cancel() {
boolean result = sync.cancel();
if (result) {
done();
}
return result;
}
/**
* Called by the success, failed, or cancelled methods to indicate that the
* value is now available and the latch can be released. Subclasses can
* use this method to deal with any actions that should be undertaken when
* the task has completed.
*/
protected void done() {
}
/**
* Subclasses can override this method to implement interruption of the
* future's computation. The method is invoked automatically by a successful
* call to {@link #cancel(boolean) cancel(true)}.
* The default implementation does nothing.
*/
protected void interruptTask() {
}
/**
* <p>
* Following the contract of {@link AbstractQueuedSynchronizer} we create a
* private subclass to hold the synchronizer. This synchronizer is used to
* implement the blocking and waiting calls as well as to handle state changes
* in a thread-safe manner. The current state of the future is held in the
* Sync state, and the lock is released whenever the state changes to either
* {@link #COMPLETED} or {@link #CANCELLED}.
* </p>
* <p>
* To avoid races between threads doing release and acquire, we transition
* to the final state in two steps. One thread will successfully CAS from
* RUNNING to COMPLETING, that thread will then set the result of the
* computation, and only then transition to COMPLETED or CANCELLED.
* </p>
* <p>
* We don't use the integer argument passed between acquire methods so we
* pass around a -1 everywhere.
* </p>
*/
static final class Sync<V> extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -796072460488712821L;
static final int RUNNING = 0;
static final int COMPLETING = 1;
static final int COMPLETED = 2;
static final int CANCELLED = 4;
private V value;
private Exception exception;
/*
* Acquisition succeeds if the future is done, otherwise it fails.
*/
@Override
protected int tryAcquireShared(int ignored) {
return isDone() ? 1 : -1;
}
/*
* We always allow a release to go through, this means the state has been
* successfully changed and the result is available.
*/
@Override
protected boolean tryReleaseShared(int finalState) {
setState(finalState);
return true;
}
/**
* Blocks until the task is complete or the timeout expires. Throws a
* {@link TimeoutException} if the timer expires, otherwise behaves like
* {@link #get()}.
*/
V get(long nanos) throws TimeoutException, CancellationException,
ExecutionException, InterruptedException {
// Attempt to acquire the shared lock with a timeout.
if (!tryAcquireSharedNanos(-1, nanos)) {
throw new TimeoutException("Timeout waiting for task.");
}
return getValue();
}
/**
* Blocks until {@link #complete(Object, Exception, int)} has been
* successfully called. Throws a {@link CancellationException} if the task
* was cancelled, or a {@link ExecutionException} if the task completed with
* an error.
*/
V get() throws CancellationException, ExecutionException,
InterruptedException {
// Acquire the shared lock allowing interruption.
acquireSharedInterruptibly(-1);
return getValue();
}
/**
* Implementation of the actual value retrieval. Will return the value
* on success, an exception on failure, a cancellation on cancellation, or
* an illegal state if the synchronizer is in an invalid state.
*/
private V getValue() throws CancellationException, ExecutionException {
int state = getState();
switch (state) {
case COMPLETED:
if (exception != null) {
throw new ExecutionException(exception);
} else {
return value;
}
case CANCELLED:
throw new CancellationException("task was cancelled");
default:
throw new IllegalStateException("error, synchronizer in invalid state: " + state);
}
}
/**
* Checks if the state is {@link #COMPLETED} or {@link #CANCELLED}.
*/
boolean isDone() {
return (getState() & (COMPLETED | CANCELLED)) != 0;
}
/**
* Checks if the state is {@link #CANCELLED}.
*/
boolean isCancelled() {
return getState() == CANCELLED;
}
boolean isSuccess() {
return value != null && getState() == COMPLETED;
}
boolean isFailed() {
return exception != null && getState() == COMPLETED;
}
/**
* Transition to the COMPLETED state and set the value.
*/
boolean set(V v) {
return complete(v, null, COMPLETED);
}
/**
* Transition to the COMPLETED state and set the exception.
*/
boolean setException(Exception exception) {
return complete(null, exception, COMPLETED);
}
/**
* Transition to the CANCELLED state.
*/
boolean cancel() {
return complete(null, null, CANCELLED);
}
/**
* Implementation of completing a task. Either {@code v} or {@code t} will
* be set but not both. The {@code finalState} is the state to change to
* from {@link #RUNNING}. If the state is not in the RUNNING state we
* return {@code false} after waiting for the state to be set to a valid
* final state ({@link #COMPLETED} or {@link #CANCELLED}).
*
* @param v the value to set as the result of the computation.
* @param exception the exception to set as the result of the computation.
* @param finalState the state to transition to.
*/
private boolean complete(V v, Exception exception, int finalState) {
boolean doCompletion = compareAndSetState(RUNNING, COMPLETING);
if (doCompletion) {
// If this thread successfully transitioned to COMPLETING, set the value
// and exception and then release to the final state.
this.value = v;
this.exception = exception;
releaseShared(finalState);
} else if (getState() == COMPLETING) {
// If some other thread is currently completing the future, block until
// they are done so we can guarantee completion.
acquireShared(-1);
}
return doCompletion;
}
}
}

View file

@ -21,6 +21,7 @@ import org.xbib.netty.http.client.HttpRequestBuilder;
import org.xbib.netty.http.client.HttpRequestContext;
import java.io.IOException;
import java.net.ConnectException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
@ -31,6 +32,8 @@ import java.util.logging.LogManager;
import java.util.logging.Logger;
import java.util.logging.SimpleFormatter;
import static org.junit.Assert.assertTrue;
/**
*/
public class ElasticsearchTest {
@ -56,6 +59,7 @@ public class ElasticsearchTest {
public void testElasticsearchCreateDocument() throws Exception {
HttpClient httpClient = HttpClient.builder()
.build();
try {
HttpRequestContext requestContext = httpClient.preparePut()
.setURL("http://localhost:9200/test/test/1")
.json("{\"text\":\"Hello World\"}")
@ -66,14 +70,19 @@ public class ElasticsearchTest {
.onException(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.execute()
.get();
httpClient.close();
logger.log(Level.FINE, "took = " + requestContext.took());
} catch (Exception exception) {
assertTrue(exception.getCause() instanceof ConnectException);
logger.log(Level.INFO, "got expected exception");
}
httpClient.close();
}
@Test
public void testElasticsearchMatchQuery() throws Exception {
HttpClient httpClient = HttpClient.builder()
.build();
try {
HttpRequestContext requestContext = httpClient.preparePost()
.setURL("http://localhost:9200/test/_search")
.json("{\"query\":{\"match\":{\"_all\":\"Hello World\"}}}")
@ -84,8 +93,12 @@ public class ElasticsearchTest {
.onException(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.execute()
.get();
httpClient.close();
logger.log(Level.FINE, "took = " + requestContext.took());
} catch (Exception exception) {
assertTrue(exception.getCause() instanceof ConnectException);
logger.log(Level.INFO, "got expected exception");
}
httpClient.close();
}
@Test
@ -103,9 +116,14 @@ public class ElasticsearchTest {
}
List<HttpRequestContext> responses = new ArrayList<>();
for (int i = 0; i < max; i++) {
try {
responses.add(contexts.get(i).get());
} catch (Exception exception) {
assertTrue(exception.getCause() instanceof ConnectException);
logger.log(Level.INFO, "got expected exception");
}
for (int i = 0; i < max; i++) {
}
for (int i = 0; i < responses.size(); i++) {
logger.log(Level.FINE, "took = " + responses.get(i).took());
}
httpClient.close();

View file

@ -18,6 +18,7 @@ package org.xbib.netty.http.client.test;
import org.junit.Test;
import org.xbib.netty.http.client.HttpClient;
import java.net.ConnectException;
import java.nio.charset.StandardCharsets;
import java.util.logging.ConsoleHandler;
import java.util.logging.Handler;
@ -26,6 +27,8 @@ import java.util.logging.LogManager;
import java.util.logging.Logger;
import java.util.logging.SimpleFormatter;
import static org.junit.Assert.assertTrue;
/**
*/
public class ExceptionTest {
@ -54,6 +57,7 @@ public class ExceptionTest {
HttpClient httpClient = HttpClient.builder()
.build();
try {
httpClient.prepareGet()
.setURL("http://localhost:1234")
.onResponse(fullHttpResponse -> {
@ -63,6 +67,10 @@ public class ExceptionTest {
.onException(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.execute()
.get();
} catch (Exception exception) {
assertTrue(exception.getCause() instanceof ConnectException);
logger.log(Level.INFO, "got expected exception");
}
httpClient.close();
}
}

View file

@ -52,6 +52,7 @@ public class IndexHbzTest {
private static final Logger logger = Logger.getLogger("");
@Test
public void testIndexHbz() throws Exception {
HttpClient httpClient = HttpClient.builder()
.build();