diff --git a/gradle.properties b/gradle.properties
index 4f1d786..8ac3c28 100644
--- a/gradle.properties
+++ b/gradle.properties
@@ -1,6 +1,6 @@
group = org.xbib
name = netty-http-client
-version = 4.1.11.2
+version = 4.1.11.3
netty.version = 4.1.11.Final
tcnative.version = 2.0.1.Final
diff --git a/src/main/java/org/xbib/netty/http/client/HttpClient.java b/src/main/java/org/xbib/netty/http/client/HttpClient.java
index eb84574..2f3ef8b 100755
--- a/src/main/java/org/xbib/netty/http/client/HttpClient.java
+++ b/src/main/java/org/xbib/netty/http/client/HttpClient.java
@@ -50,6 +50,7 @@ import org.xbib.netty.http.client.util.NetworkUtils;
import java.io.Closeable;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
+import java.net.ConnectException;
import java.net.URI;
import java.net.URLDecoder;
import java.util.Collection;
@@ -70,6 +71,13 @@ public final class HttpClient implements Closeable {
private static final HttpClient INSTANCE = HttpClient.builder().build();
+ static {
+ NetworkUtils.extendSystemProperties();
+ logger.log(Level.FINE, () -> "OpenSSL ALPN support: " + OpenSsl.isAlpnSupported());
+ logger.log(Level.FINE, () -> "local host name = " + NetworkUtils.getLocalHostName("localhost"));
+ logger.log(Level.FINE, NetworkUtils::displayNetworkInterfaces);
+ }
+
private final ByteBufAllocator byteBufAllocator;
private final EventLoopGroup eventLoopGroup;
@@ -87,10 +95,6 @@ public final class HttpClient implements Closeable {
this.byteBufAllocator = byteBufAllocator;
this.eventLoopGroup = eventLoopGroup;
this.poolMap = new HttpClientChannelPoolMap(this, httpClientChannelContext, bootstrap, maxConnections);
- logger.log(Level.FINE, () -> "OpenSSL ALPN support: " + OpenSsl.isAlpnSupported());
- NetworkUtils.extendSystemProperties();
- logger.log(Level.FINE, () -> "local host name = " + NetworkUtils.getLocalHostName("localhost"));
- logger.log(Level.FINE, NetworkUtils::displayNetworkInterfaces);
}
public static HttpClient getInstance() {
@@ -225,7 +229,7 @@ public final class HttpClient implements Closeable {
logger.log(Level.FINE, () -> "closing pool map");
poolMap.close();
logger.log(Level.FINE, () -> "closing event loop group");
- if (!eventLoopGroup.isTerminated()) {
+ if (!eventLoopGroup.isShuttingDown()) {
eventLoopGroup.shutdownGracefully();
}
logger.log(Level.FINE, () -> "closed");
@@ -360,7 +364,7 @@ public final class HttpClient implements Closeable {
if (exceptionListener != null) {
exceptionListener.onException(future.cause());
}
- httpRequestContext.fail("channel pool failure");
+ httpRequestContext.fail(new ConnectException("unable to connect to " + inetAddressKey));
}
});
}
diff --git a/src/main/java/org/xbib/netty/http/client/HttpClientRequestBuilder.java b/src/main/java/org/xbib/netty/http/client/HttpClientRequestBuilder.java
index bb51172..f7309d0 100644
--- a/src/main/java/org/xbib/netty/http/client/HttpClientRequestBuilder.java
+++ b/src/main/java/org/xbib/netty/http/client/HttpClientRequestBuilder.java
@@ -52,8 +52,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.logging.Level;
@@ -102,6 +100,8 @@ public class HttpClientRequestBuilder implements HttpRequestBuilder, HttpRequest
private HttpRequest httpRequest;
+ private HttpRequestFuture httpRequestFuture = DEFAULT_FUTURE;
+
private HttpRequestContext httpRequestContext;
private HttpResponseListener httpResponseListener;
@@ -141,6 +141,11 @@ public class HttpClientRequestBuilder implements HttpRequestBuilder, HttpRequest
return new HttpClientRequestBuilder(httpMethod, UnpooledByteBufAllocator.DEFAULT, 3);
}
+ public HttpRequestBuilder withFuture(HttpRequestFuture httpRequestFuture) {
+ this.httpRequestFuture = httpRequestFuture;
+ return this;
+ }
+
@Override
public HttpRequestBuilder setHttp1() {
this.httpVersion = HttpVersion.HTTP_1_1;
@@ -377,12 +382,11 @@ public class HttpClientRequestBuilder implements HttpRequestBuilder, HttpRequest
if (httpResponseListener == null) {
httpResponseListener = httpRequestContext;
}
- httpRequestContext = new HttpRequestContext(uri, httpRequest, streamId,
- new AtomicBoolean(false),
- new AtomicBoolean(false),
+ httpRequestContext = new HttpRequestContext(uri, httpRequest,
+ httpRequestFuture,
+ streamId,
timeout, System.currentTimeMillis(),
followRedirect, maxRedirects, new AtomicInteger(0),
- new CountDownLatch(1),
httpResponseListener,
exceptionListener,
httpHeadersListener,
diff --git a/src/main/java/org/xbib/netty/http/client/HttpRequestContext.java b/src/main/java/org/xbib/netty/http/client/HttpRequestContext.java
index 99bad33..b588f42 100755
--- a/src/main/java/org/xbib/netty/http/client/HttpRequestContext.java
+++ b/src/main/java/org/xbib/netty/http/client/HttpRequestContext.java
@@ -34,9 +34,9 @@ import java.util.AbstractMap;
import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
@@ -53,9 +53,7 @@ public final class HttpRequestContext implements HttpResponseListener, HttpReque
private final HttpRequest httpRequest;
- private final AtomicBoolean succeeded;
-
- private final AtomicBoolean failed;
+ private final HttpRequestFuture httpRequestFuture;
private final boolean followRedirect;
@@ -67,8 +65,6 @@ public final class HttpRequestContext implements HttpResponseListener, HttpReque
private final Long startTime;
- private final CountDownLatch latch;
-
private final AtomicInteger streamId;
private final HttpResponseListener httpResponseListener;
@@ -91,15 +87,13 @@ public final class HttpRequestContext implements HttpResponseListener, HttpReque
private Map httpResponses;
- private boolean hastimeout;
-
private Long stopTime;
- HttpRequestContext(URI uri, HttpRequest httpRequest, AtomicInteger streamId,
- AtomicBoolean succeeded, AtomicBoolean failed,
+ HttpRequestContext(URI uri, HttpRequest httpRequest,
+ HttpRequestFuture httpRequestFuture,
+ AtomicInteger streamId,
int timeout, Long startTime,
boolean followRedirect, int maxRedirects, AtomicInteger redirectCount,
- CountDownLatch latch,
HttpResponseListener httpResponseListener,
ExceptionListener exceptionListener,
HttpHeadersListener httpHeadersListener,
@@ -107,15 +101,13 @@ public final class HttpRequestContext implements HttpResponseListener, HttpReque
HttpPushListener httpPushListener) {
this.uri = uri;
this.httpRequest = httpRequest;
+ this.httpRequestFuture = httpRequestFuture;
this.streamId = streamId;
- this.succeeded = succeeded;
- this.failed = failed;
this.timeout = timeout;
this.startTime = startTime;
this.followRedirect = followRedirect;
this.maxRedirects = maxRedirects;
this.redirectCount = redirectCount;
- this.latch = latch;
this.httpResponseListener = httpResponseListener;
this.exceptionListener = exceptionListener;
this.httpHeadersListener = httpHeadersListener;
@@ -133,16 +125,13 @@ public final class HttpRequestContext implements HttpResponseListener, HttpReque
HttpRequestContext(URI uri, HttpRequest httpRequest, HttpRequestContext httpRequestContext) {
this.uri = uri;
this.httpRequest = httpRequest;
+ this.httpRequestFuture = httpRequestContext.httpRequestFuture;
this.streamId = httpRequestContext.streamId;
- this.succeeded = httpRequestContext.succeeded;
- this.failed = httpRequestContext.failed;
- this.failed.lazySet(false); // reset
this.timeout = httpRequestContext.timeout;
this.startTime = httpRequestContext.startTime;
this.followRedirect = httpRequestContext.followRedirect;
this.maxRedirects = httpRequestContext.maxRedirects;
this.redirectCount = httpRequestContext.redirectCount;
- this.latch = httpRequestContext.latch;
this.httpResponseListener = httpRequestContext.httpResponseListener;
this.exceptionListener = httpRequestContext.exceptionListener;
this.httpHeadersListener = httpRequestContext.httpHeadersListener;
@@ -247,11 +236,11 @@ public final class HttpRequestContext implements HttpResponseListener, HttpReque
}
public boolean isSucceeded() {
- return succeeded.get();
+ return httpRequestFuture.isSucceeded();
}
public boolean isFailed() {
- return failed.get();
+ return httpRequestFuture.isFailed();
}
public boolean isFollowRedirect() {
@@ -278,44 +267,36 @@ public final class HttpRequestContext implements HttpResponseListener, HttpReque
return (startTime + timeout) - System.currentTimeMillis();
}
- public CountDownLatch getLatch() {
- return latch;
- }
-
public AtomicInteger getStreamId() {
return streamId;
}
- public HttpRequestContext get() throws InterruptedException {
- return waitFor(DEFAULT_TIMEOUT_MILLIS, TimeUnit.SECONDS);
+ public HttpRequestContext get() throws InterruptedException, TimeoutException, ExecutionException {
+ return get(DEFAULT_TIMEOUT_MILLIS, TimeUnit.SECONDS);
}
- public HttpRequestContext waitFor(long value, TimeUnit timeUnit) throws InterruptedException {
- this.hastimeout = latch.await(value, timeUnit);
+ public HttpRequestContext get(long timeout, TimeUnit timeUnit)
+ throws InterruptedException, TimeoutException, ExecutionException {
+ httpRequestFuture.get(timeout, timeUnit);
stopTime = System.currentTimeMillis();
return this;
}
- public boolean isTimeout() {
- return hastimeout;
- }
-
public void success(String reason) {
logger.log(Level.FINE, () -> "success because of " + reason);
- if (succeeded.compareAndSet(false, true)) {
- latch.countDown();
- }
+ httpRequestFuture.success(reason);
}
public void fail(String reason) {
- logger.log(Level.FINE, () -> "failed because of " + reason);
- IllegalStateException exception = new IllegalStateException(reason);
+ fail(new IllegalStateException(reason));
+ }
+
+ public void fail(Exception exception) {
+ logger.log(Level.FINE, () -> "failed because of " + exception.getMessage());
if (exceptionListener != null) {
exceptionListener.onException(exception);
}
- if (failed.compareAndSet(false, true)) {
- latch.countDown();
- }
+ httpRequestFuture.fail(exception);
}
@Override
diff --git a/src/main/java/org/xbib/netty/http/client/HttpRequestDefaults.java b/src/main/java/org/xbib/netty/http/client/HttpRequestDefaults.java
index 694dbd9..66a704e 100644
--- a/src/main/java/org/xbib/netty/http/client/HttpRequestDefaults.java
+++ b/src/main/java/org/xbib/netty/http/client/HttpRequestDefaults.java
@@ -37,4 +37,6 @@ public interface HttpRequestDefaults {
int DEFAULT_TIMEOUT_MILLIS = 5000;
int DEFAULT_MAX_REDIRECT = 10;
+
+ HttpRequestFuture DEFAULT_FUTURE = new HttpRequestFuture<>();
}
diff --git a/src/main/java/org/xbib/netty/http/client/HttpRequestFuture.java b/src/main/java/org/xbib/netty/http/client/HttpRequestFuture.java
new file mode 100644
index 0000000..43217eb
--- /dev/null
+++ b/src/main/java/org/xbib/netty/http/client/HttpRequestFuture.java
@@ -0,0 +1,20 @@
+package org.xbib.netty.http.client;
+
+import org.xbib.netty.http.client.util.AbstractFuture;
+
+/**
+ * A HTTP request future.
+ *
+ * @param the response type parameter.
+ */
+public class HttpRequestFuture extends AbstractFuture {
+
+ public void success(V v) {
+ set(v);
+ }
+
+ public void fail(Exception e) {
+ setException(e);
+ }
+
+}
diff --git a/src/main/java/org/xbib/netty/http/client/handler/HttpClientChannelInitializer.java b/src/main/java/org/xbib/netty/http/client/handler/HttpClientChannelInitializer.java
index e168b87..7190818 100644
--- a/src/main/java/org/xbib/netty/http/client/handler/HttpClientChannelInitializer.java
+++ b/src/main/java/org/xbib/netty/http/client/handler/HttpClientChannelInitializer.java
@@ -142,9 +142,9 @@ public class HttpClientChannelInitializer extends ChannelInitializer
+ * An abstract implementation of the {@link Future} interface. This class
+ * is an abstraction of {@link java.util.concurrent.FutureTask} to support use
+ * for tasks other than {@link Runnable}s. It uses an
+ * {@link AbstractQueuedSynchronizer} to deal with concurrency issues and
+ * guarantee thread safety. It could be used as a base class to
+ * {@code FutureTask}, or any other implementor of the {@code Future} interface.
+ *
+ *
+ *
+ * This class implements all methods in {@code Future}. Subclasses should
+ * provide a way to set the result of the computation through the protected
+ * methods {@link #set(Object)}, {@link #setException(Exception)}, or
+ * {@link #cancel()}. If subclasses want to implement cancellation they can
+ * override the {@link #cancel(boolean)} method with a real implementation, the
+ * default implementation doesn't support cancellation.
+ *
+ *
+ *
+ * The state changing methods all return a boolean indicating success or
+ * failure in changing the future's state. Valid states are running,
+ * completed, failed, or cancelled. Because this class does not implement
+ * cancellation it is left to the subclass to distinguish between created
+ * and running tasks.
+ *
+ *
+ * This class is taken from the Google Guava project.
+ *
+ * @param the future value parameter type
+ */
+public abstract class AbstractFuture implements Future {
+
+ /**
+ * Synchronization control.
+ */
+ private final Sync sync = new Sync<>();
+
+ /**
+ * The default {@link AbstractFuture} implementation throws {@code
+ * InterruptedException} if the current thread is interrupted before or during
+ * the call, even if the value is already available.
+ *
+ * @throws InterruptedException if the current thread was interrupted before
+ * or during the call (optional but recommended).
+ * @throws TimeoutException if operation timed out
+ * @throws ExecutionException if execution fails
+ */
+ @Override
+ public V get(long timeout, TimeUnit unit) throws InterruptedException,
+ TimeoutException, ExecutionException {
+ return sync.get(unit.toNanos(timeout));
+ }
+
+ /**
+ * The default {@link AbstractFuture} implementation throws {@code
+ * InterruptedException} if the current thread is interrupted before or during
+ * the call, even if the value is already available.
+ *
+ * @throws InterruptedException if the current thread was interrupted before
+ * or during the call (optional but recommended).
+ * @throws ExecutionException if execution fails
+ */
+ @Override
+ public V get() throws InterruptedException, ExecutionException {
+ return sync.get();
+ }
+
+ /**
+ * Checks if the sync is not in the running state.
+ */
+ @Override
+ public boolean isDone() {
+ return sync.isDone();
+ }
+
+ /**
+ * Checks if the sync is in the cancelled state.
+ */
+ @Override
+ public boolean isCancelled() {
+ return sync.isCancelled();
+ }
+
+ public boolean isSucceeded() {
+ return sync.isSuccess();
+ }
+
+ public boolean isFailed() {
+ return sync.isFailed();
+ }
+
+ /**
+ * Default implementation of cancel that cancels the future.
+ */
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ if (!sync.cancel()) {
+ return false;
+ }
+ done();
+ if (mayInterruptIfRunning) {
+ interruptTask();
+ }
+ return true;
+ }
+
+ /**
+ * Subclasses should invoke this method to set the result of the computation
+ * to {@code value}. This will set the state of the future to
+ * {@link AbstractFuture.Sync#COMPLETED} and call {@link #done()} if the
+ * state was successfully changed.
+ *
+ * @param value the value that was the result of the task.
+ * @return true if the state was successfully changed.
+ */
+ protected boolean set(V value) {
+ boolean result = sync.set(value);
+ if (result) {
+ done();
+ }
+ return result;
+ }
+
+ /**
+ * Subclasses should invoke this method to set the result of the computation
+ * to an error, {@code throwable}. This will set the state of the future to
+ * {@link AbstractFuture.Sync#COMPLETED} and call {@link #done()} if the
+ * state was successfully changed.
+ *
+ * @param exception the exception that the task failed with.
+ * @return true if the state was successfully changed.
+ */
+ protected boolean setException(Exception exception) {
+ boolean result = sync.setException(exception);
+ if (result) {
+ done();
+ }
+ return result;
+ }
+
+ /**
+ * Subclasses should invoke this method to mark the future as cancelled.
+ * This will set the state of the future to {@link
+ * AbstractFuture.Sync#CANCELLED} and call {@link #done()} if the state was
+ * successfully changed.
+ *
+ * @return true if the state was successfully changed.
+ */
+ protected final boolean cancel() {
+ boolean result = sync.cancel();
+ if (result) {
+ done();
+ }
+ return result;
+ }
+
+ /**
+ * Called by the success, failed, or cancelled methods to indicate that the
+ * value is now available and the latch can be released. Subclasses can
+ * use this method to deal with any actions that should be undertaken when
+ * the task has completed.
+ */
+ protected void done() {
+ }
+
+ /**
+ * Subclasses can override this method to implement interruption of the
+ * future's computation. The method is invoked automatically by a successful
+ * call to {@link #cancel(boolean) cancel(true)}.
+ * The default implementation does nothing.
+ */
+ protected void interruptTask() {
+ }
+
+ /**
+ *
+ * Following the contract of {@link AbstractQueuedSynchronizer} we create a
+ * private subclass to hold the synchronizer. This synchronizer is used to
+ * implement the blocking and waiting calls as well as to handle state changes
+ * in a thread-safe manner. The current state of the future is held in the
+ * Sync state, and the lock is released whenever the state changes to either
+ * {@link #COMPLETED} or {@link #CANCELLED}.
+ *
+ *
+ * To avoid races between threads doing release and acquire, we transition
+ * to the final state in two steps. One thread will successfully CAS from
+ * RUNNING to COMPLETING, that thread will then set the result of the
+ * computation, and only then transition to COMPLETED or CANCELLED.
+ *
+ *
+ * We don't use the integer argument passed between acquire methods so we
+ * pass around a -1 everywhere.
+ *
+ */
+ static final class Sync extends AbstractQueuedSynchronizer {
+
+ private static final long serialVersionUID = -796072460488712821L;
+
+ static final int RUNNING = 0;
+ static final int COMPLETING = 1;
+ static final int COMPLETED = 2;
+ static final int CANCELLED = 4;
+
+ private V value;
+ private Exception exception;
+
+ /*
+ * Acquisition succeeds if the future is done, otherwise it fails.
+ */
+ @Override
+ protected int tryAcquireShared(int ignored) {
+ return isDone() ? 1 : -1;
+ }
+
+ /*
+ * We always allow a release to go through, this means the state has been
+ * successfully changed and the result is available.
+ */
+ @Override
+ protected boolean tryReleaseShared(int finalState) {
+ setState(finalState);
+ return true;
+ }
+
+ /**
+ * Blocks until the task is complete or the timeout expires. Throws a
+ * {@link TimeoutException} if the timer expires, otherwise behaves like
+ * {@link #get()}.
+ */
+ V get(long nanos) throws TimeoutException, CancellationException,
+ ExecutionException, InterruptedException {
+ // Attempt to acquire the shared lock with a timeout.
+ if (!tryAcquireSharedNanos(-1, nanos)) {
+ throw new TimeoutException("Timeout waiting for task.");
+ }
+ return getValue();
+ }
+
+ /**
+ * Blocks until {@link #complete(Object, Exception, int)} has been
+ * successfully called. Throws a {@link CancellationException} if the task
+ * was cancelled, or a {@link ExecutionException} if the task completed with
+ * an error.
+ */
+ V get() throws CancellationException, ExecutionException,
+ InterruptedException {
+ // Acquire the shared lock allowing interruption.
+ acquireSharedInterruptibly(-1);
+ return getValue();
+ }
+
+ /**
+ * Implementation of the actual value retrieval. Will return the value
+ * on success, an exception on failure, a cancellation on cancellation, or
+ * an illegal state if the synchronizer is in an invalid state.
+ */
+ private V getValue() throws CancellationException, ExecutionException {
+ int state = getState();
+ switch (state) {
+ case COMPLETED:
+ if (exception != null) {
+ throw new ExecutionException(exception);
+ } else {
+ return value;
+ }
+ case CANCELLED:
+ throw new CancellationException("task was cancelled");
+ default:
+ throw new IllegalStateException("error, synchronizer in invalid state: " + state);
+ }
+ }
+
+ /**
+ * Checks if the state is {@link #COMPLETED} or {@link #CANCELLED}.
+ */
+ boolean isDone() {
+ return (getState() & (COMPLETED | CANCELLED)) != 0;
+ }
+
+ /**
+ * Checks if the state is {@link #CANCELLED}.
+ */
+ boolean isCancelled() {
+ return getState() == CANCELLED;
+ }
+
+ boolean isSuccess() {
+ return value != null && getState() == COMPLETED;
+ }
+
+ boolean isFailed() {
+ return exception != null && getState() == COMPLETED;
+ }
+
+ /**
+ * Transition to the COMPLETED state and set the value.
+ */
+ boolean set(V v) {
+ return complete(v, null, COMPLETED);
+ }
+
+ /**
+ * Transition to the COMPLETED state and set the exception.
+ */
+ boolean setException(Exception exception) {
+ return complete(null, exception, COMPLETED);
+ }
+
+ /**
+ * Transition to the CANCELLED state.
+ */
+ boolean cancel() {
+ return complete(null, null, CANCELLED);
+ }
+
+ /**
+ * Implementation of completing a task. Either {@code v} or {@code t} will
+ * be set but not both. The {@code finalState} is the state to change to
+ * from {@link #RUNNING}. If the state is not in the RUNNING state we
+ * return {@code false} after waiting for the state to be set to a valid
+ * final state ({@link #COMPLETED} or {@link #CANCELLED}).
+ *
+ * @param v the value to set as the result of the computation.
+ * @param exception the exception to set as the result of the computation.
+ * @param finalState the state to transition to.
+ */
+ private boolean complete(V v, Exception exception, int finalState) {
+ boolean doCompletion = compareAndSetState(RUNNING, COMPLETING);
+ if (doCompletion) {
+ // If this thread successfully transitioned to COMPLETING, set the value
+ // and exception and then release to the final state.
+ this.value = v;
+ this.exception = exception;
+ releaseShared(finalState);
+ } else if (getState() == COMPLETING) {
+ // If some other thread is currently completing the future, block until
+ // they are done so we can guarantee completion.
+ acquireShared(-1);
+ }
+ return doCompletion;
+ }
+ }
+}
diff --git a/src/test/java/org/xbib/netty/http/client/test/ElasticsearchTest.java b/src/test/java/org/xbib/netty/http/client/test/ElasticsearchTest.java
index 6298d3d..49130a2 100644
--- a/src/test/java/org/xbib/netty/http/client/test/ElasticsearchTest.java
+++ b/src/test/java/org/xbib/netty/http/client/test/ElasticsearchTest.java
@@ -21,6 +21,7 @@ import org.xbib.netty.http.client.HttpRequestBuilder;
import org.xbib.netty.http.client.HttpRequestContext;
import java.io.IOException;
+import java.net.ConnectException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
@@ -31,6 +32,8 @@ import java.util.logging.LogManager;
import java.util.logging.Logger;
import java.util.logging.SimpleFormatter;
+import static org.junit.Assert.assertTrue;
+
/**
*/
public class ElasticsearchTest {
@@ -56,7 +59,8 @@ public class ElasticsearchTest {
public void testElasticsearchCreateDocument() throws Exception {
HttpClient httpClient = HttpClient.builder()
.build();
- HttpRequestContext requestContext = httpClient.preparePut()
+ try {
+ HttpRequestContext requestContext = httpClient.preparePut()
.setURL("http://localhost:9200/test/test/1")
.json("{\"text\":\"Hello World\"}")
.onResponse(fullHttpResponse -> {
@@ -66,15 +70,20 @@ public class ElasticsearchTest {
.onException(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.execute()
.get();
+ logger.log(Level.FINE, "took = " + requestContext.took());
+ } catch (Exception exception) {
+ assertTrue(exception.getCause() instanceof ConnectException);
+ logger.log(Level.INFO, "got expected exception");
+ }
httpClient.close();
- logger.log(Level.FINE, "took = " + requestContext.took());
}
@Test
public void testElasticsearchMatchQuery() throws Exception {
HttpClient httpClient = HttpClient.builder()
.build();
- HttpRequestContext requestContext = httpClient.preparePost()
+ try {
+ HttpRequestContext requestContext = httpClient.preparePost()
.setURL("http://localhost:9200/test/_search")
.json("{\"query\":{\"match\":{\"_all\":\"Hello World\"}}}")
.onResponse(fullHttpResponse -> {
@@ -84,8 +93,12 @@ public class ElasticsearchTest {
.onException(e -> logger.log(Level.SEVERE, e.getMessage(), e))
.execute()
.get();
+ logger.log(Level.FINE, "took = " + requestContext.took());
+ } catch (Exception exception) {
+ assertTrue(exception.getCause() instanceof ConnectException);
+ logger.log(Level.INFO, "got expected exception");
+ }
httpClient.close();
- logger.log(Level.FINE, "took = " + requestContext.took());
}
@Test
@@ -103,9 +116,14 @@ public class ElasticsearchTest {
}
List responses = new ArrayList<>();
for (int i = 0; i < max; i++) {
- responses.add(contexts.get(i).get());
+ try {
+ responses.add(contexts.get(i).get());
+ } catch (Exception exception) {
+ assertTrue(exception.getCause() instanceof ConnectException);
+ logger.log(Level.INFO, "got expected exception");
+ }
}
- for (int i = 0; i < max; i++) {
+ for (int i = 0; i < responses.size(); i++) {
logger.log(Level.FINE, "took = " + responses.get(i).took());
}
httpClient.close();
diff --git a/src/test/java/org/xbib/netty/http/client/test/ExceptionTest.java b/src/test/java/org/xbib/netty/http/client/test/ExceptionTest.java
index cfa5b57..130aca4 100644
--- a/src/test/java/org/xbib/netty/http/client/test/ExceptionTest.java
+++ b/src/test/java/org/xbib/netty/http/client/test/ExceptionTest.java
@@ -18,6 +18,7 @@ package org.xbib.netty.http.client.test;
import org.junit.Test;
import org.xbib.netty.http.client.HttpClient;
+import java.net.ConnectException;
import java.nio.charset.StandardCharsets;
import java.util.logging.ConsoleHandler;
import java.util.logging.Handler;
@@ -26,6 +27,8 @@ import java.util.logging.LogManager;
import java.util.logging.Logger;
import java.util.logging.SimpleFormatter;
+import static org.junit.Assert.assertTrue;
+
/**
*/
public class ExceptionTest {
@@ -54,15 +57,20 @@ public class ExceptionTest {
HttpClient httpClient = HttpClient.builder()
.build();
- httpClient.prepareGet()
- .setURL("http://localhost:1234")
- .onResponse(fullHttpResponse -> {
- String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
- logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response);
- })
- .onException(e -> logger.log(Level.SEVERE, e.getMessage(), e))
- .execute()
- .get();
+ try {
+ httpClient.prepareGet()
+ .setURL("http://localhost:1234")
+ .onResponse(fullHttpResponse -> {
+ String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8);
+ logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response);
+ })
+ .onException(e -> logger.log(Level.SEVERE, e.getMessage(), e))
+ .execute()
+ .get();
+ } catch (Exception exception) {
+ assertTrue(exception.getCause() instanceof ConnectException);
+ logger.log(Level.INFO, "got expected exception");
+ }
httpClient.close();
}
}
diff --git a/src/test/java/org/xbib/netty/http/client/test/IndexHbzTest.java b/src/test/java/org/xbib/netty/http/client/test/IndexHbzTest.java
index 580f83b..d21da8c 100644
--- a/src/test/java/org/xbib/netty/http/client/test/IndexHbzTest.java
+++ b/src/test/java/org/xbib/netty/http/client/test/IndexHbzTest.java
@@ -52,6 +52,7 @@ public class IndexHbzTest {
private static final Logger logger = Logger.getLogger("");
+ @Test
public void testIndexHbz() throws Exception {
HttpClient httpClient = HttpClient.builder()
.build();