From 7af00b68692595b8e872d4181cb6c65bdbd3f24a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=CC=88rg=20Prante?= Date: Tue, 30 May 2017 00:24:22 +0200 Subject: [PATCH] replace latch with future --- gradle.properties | 2 +- .../xbib/netty/http/client/HttpClient.java | 16 +- .../http/client/HttpClientRequestBuilder.java | 16 +- .../netty/http/client/HttpRequestContext.java | 63 ++-- .../http/client/HttpRequestDefaults.java | 2 + .../netty/http/client/HttpRequestFuture.java | 20 + .../handler/HttpClientChannelInitializer.java | 2 +- .../http/client/util/AbstractFuture.java | 353 ++++++++++++++++++ .../http/client/test/ElasticsearchTest.java | 30 +- .../netty/http/client/test/ExceptionTest.java | 26 +- .../netty/http/client/test/IndexHbzTest.java | 1 + 11 files changed, 461 insertions(+), 70 deletions(-) create mode 100644 src/main/java/org/xbib/netty/http/client/HttpRequestFuture.java create mode 100644 src/main/java/org/xbib/netty/http/client/util/AbstractFuture.java diff --git a/gradle.properties b/gradle.properties index 4f1d786..8ac3c28 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,6 +1,6 @@ group = org.xbib name = netty-http-client -version = 4.1.11.2 +version = 4.1.11.3 netty.version = 4.1.11.Final tcnative.version = 2.0.1.Final diff --git a/src/main/java/org/xbib/netty/http/client/HttpClient.java b/src/main/java/org/xbib/netty/http/client/HttpClient.java index eb84574..2f3ef8b 100755 --- a/src/main/java/org/xbib/netty/http/client/HttpClient.java +++ b/src/main/java/org/xbib/netty/http/client/HttpClient.java @@ -50,6 +50,7 @@ import org.xbib.netty.http.client.util.NetworkUtils; import java.io.Closeable; import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.net.ConnectException; import java.net.URI; import java.net.URLDecoder; import java.util.Collection; @@ -70,6 +71,13 @@ public final class HttpClient implements Closeable { private static final HttpClient INSTANCE = HttpClient.builder().build(); + static { + NetworkUtils.extendSystemProperties(); + logger.log(Level.FINE, () -> "OpenSSL ALPN support: " + OpenSsl.isAlpnSupported()); + logger.log(Level.FINE, () -> "local host name = " + NetworkUtils.getLocalHostName("localhost")); + logger.log(Level.FINE, NetworkUtils::displayNetworkInterfaces); + } + private final ByteBufAllocator byteBufAllocator; private final EventLoopGroup eventLoopGroup; @@ -87,10 +95,6 @@ public final class HttpClient implements Closeable { this.byteBufAllocator = byteBufAllocator; this.eventLoopGroup = eventLoopGroup; this.poolMap = new HttpClientChannelPoolMap(this, httpClientChannelContext, bootstrap, maxConnections); - logger.log(Level.FINE, () -> "OpenSSL ALPN support: " + OpenSsl.isAlpnSupported()); - NetworkUtils.extendSystemProperties(); - logger.log(Level.FINE, () -> "local host name = " + NetworkUtils.getLocalHostName("localhost")); - logger.log(Level.FINE, NetworkUtils::displayNetworkInterfaces); } public static HttpClient getInstance() { @@ -225,7 +229,7 @@ public final class HttpClient implements Closeable { logger.log(Level.FINE, () -> "closing pool map"); poolMap.close(); logger.log(Level.FINE, () -> "closing event loop group"); - if (!eventLoopGroup.isTerminated()) { + if (!eventLoopGroup.isShuttingDown()) { eventLoopGroup.shutdownGracefully(); } logger.log(Level.FINE, () -> "closed"); @@ -360,7 +364,7 @@ public final class HttpClient implements Closeable { if (exceptionListener != null) { exceptionListener.onException(future.cause()); } - httpRequestContext.fail("channel pool failure"); + httpRequestContext.fail(new ConnectException("unable to connect to " + inetAddressKey)); } }); } diff --git a/src/main/java/org/xbib/netty/http/client/HttpClientRequestBuilder.java b/src/main/java/org/xbib/netty/http/client/HttpClientRequestBuilder.java index bb51172..f7309d0 100644 --- a/src/main/java/org/xbib/netty/http/client/HttpClientRequestBuilder.java +++ b/src/main/java/org/xbib/netty/http/client/HttpClientRequestBuilder.java @@ -52,8 +52,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import java.util.logging.Level; @@ -102,6 +100,8 @@ public class HttpClientRequestBuilder implements HttpRequestBuilder, HttpRequest private HttpRequest httpRequest; + private HttpRequestFuture httpRequestFuture = DEFAULT_FUTURE; + private HttpRequestContext httpRequestContext; private HttpResponseListener httpResponseListener; @@ -141,6 +141,11 @@ public class HttpClientRequestBuilder implements HttpRequestBuilder, HttpRequest return new HttpClientRequestBuilder(httpMethod, UnpooledByteBufAllocator.DEFAULT, 3); } + public HttpRequestBuilder withFuture(HttpRequestFuture httpRequestFuture) { + this.httpRequestFuture = httpRequestFuture; + return this; + } + @Override public HttpRequestBuilder setHttp1() { this.httpVersion = HttpVersion.HTTP_1_1; @@ -377,12 +382,11 @@ public class HttpClientRequestBuilder implements HttpRequestBuilder, HttpRequest if (httpResponseListener == null) { httpResponseListener = httpRequestContext; } - httpRequestContext = new HttpRequestContext(uri, httpRequest, streamId, - new AtomicBoolean(false), - new AtomicBoolean(false), + httpRequestContext = new HttpRequestContext(uri, httpRequest, + httpRequestFuture, + streamId, timeout, System.currentTimeMillis(), followRedirect, maxRedirects, new AtomicInteger(0), - new CountDownLatch(1), httpResponseListener, exceptionListener, httpHeadersListener, diff --git a/src/main/java/org/xbib/netty/http/client/HttpRequestContext.java b/src/main/java/org/xbib/netty/http/client/HttpRequestContext.java index 99bad33..b588f42 100755 --- a/src/main/java/org/xbib/netty/http/client/HttpRequestContext.java +++ b/src/main/java/org/xbib/netty/http/client/HttpRequestContext.java @@ -34,9 +34,9 @@ import java.util.AbstractMap; import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; @@ -53,9 +53,7 @@ public final class HttpRequestContext implements HttpResponseListener, HttpReque private final HttpRequest httpRequest; - private final AtomicBoolean succeeded; - - private final AtomicBoolean failed; + private final HttpRequestFuture httpRequestFuture; private final boolean followRedirect; @@ -67,8 +65,6 @@ public final class HttpRequestContext implements HttpResponseListener, HttpReque private final Long startTime; - private final CountDownLatch latch; - private final AtomicInteger streamId; private final HttpResponseListener httpResponseListener; @@ -91,15 +87,13 @@ public final class HttpRequestContext implements HttpResponseListener, HttpReque private Map httpResponses; - private boolean hastimeout; - private Long stopTime; - HttpRequestContext(URI uri, HttpRequest httpRequest, AtomicInteger streamId, - AtomicBoolean succeeded, AtomicBoolean failed, + HttpRequestContext(URI uri, HttpRequest httpRequest, + HttpRequestFuture httpRequestFuture, + AtomicInteger streamId, int timeout, Long startTime, boolean followRedirect, int maxRedirects, AtomicInteger redirectCount, - CountDownLatch latch, HttpResponseListener httpResponseListener, ExceptionListener exceptionListener, HttpHeadersListener httpHeadersListener, @@ -107,15 +101,13 @@ public final class HttpRequestContext implements HttpResponseListener, HttpReque HttpPushListener httpPushListener) { this.uri = uri; this.httpRequest = httpRequest; + this.httpRequestFuture = httpRequestFuture; this.streamId = streamId; - this.succeeded = succeeded; - this.failed = failed; this.timeout = timeout; this.startTime = startTime; this.followRedirect = followRedirect; this.maxRedirects = maxRedirects; this.redirectCount = redirectCount; - this.latch = latch; this.httpResponseListener = httpResponseListener; this.exceptionListener = exceptionListener; this.httpHeadersListener = httpHeadersListener; @@ -133,16 +125,13 @@ public final class HttpRequestContext implements HttpResponseListener, HttpReque HttpRequestContext(URI uri, HttpRequest httpRequest, HttpRequestContext httpRequestContext) { this.uri = uri; this.httpRequest = httpRequest; + this.httpRequestFuture = httpRequestContext.httpRequestFuture; this.streamId = httpRequestContext.streamId; - this.succeeded = httpRequestContext.succeeded; - this.failed = httpRequestContext.failed; - this.failed.lazySet(false); // reset this.timeout = httpRequestContext.timeout; this.startTime = httpRequestContext.startTime; this.followRedirect = httpRequestContext.followRedirect; this.maxRedirects = httpRequestContext.maxRedirects; this.redirectCount = httpRequestContext.redirectCount; - this.latch = httpRequestContext.latch; this.httpResponseListener = httpRequestContext.httpResponseListener; this.exceptionListener = httpRequestContext.exceptionListener; this.httpHeadersListener = httpRequestContext.httpHeadersListener; @@ -247,11 +236,11 @@ public final class HttpRequestContext implements HttpResponseListener, HttpReque } public boolean isSucceeded() { - return succeeded.get(); + return httpRequestFuture.isSucceeded(); } public boolean isFailed() { - return failed.get(); + return httpRequestFuture.isFailed(); } public boolean isFollowRedirect() { @@ -278,44 +267,36 @@ public final class HttpRequestContext implements HttpResponseListener, HttpReque return (startTime + timeout) - System.currentTimeMillis(); } - public CountDownLatch getLatch() { - return latch; - } - public AtomicInteger getStreamId() { return streamId; } - public HttpRequestContext get() throws InterruptedException { - return waitFor(DEFAULT_TIMEOUT_MILLIS, TimeUnit.SECONDS); + public HttpRequestContext get() throws InterruptedException, TimeoutException, ExecutionException { + return get(DEFAULT_TIMEOUT_MILLIS, TimeUnit.SECONDS); } - public HttpRequestContext waitFor(long value, TimeUnit timeUnit) throws InterruptedException { - this.hastimeout = latch.await(value, timeUnit); + public HttpRequestContext get(long timeout, TimeUnit timeUnit) + throws InterruptedException, TimeoutException, ExecutionException { + httpRequestFuture.get(timeout, timeUnit); stopTime = System.currentTimeMillis(); return this; } - public boolean isTimeout() { - return hastimeout; - } - public void success(String reason) { logger.log(Level.FINE, () -> "success because of " + reason); - if (succeeded.compareAndSet(false, true)) { - latch.countDown(); - } + httpRequestFuture.success(reason); } public void fail(String reason) { - logger.log(Level.FINE, () -> "failed because of " + reason); - IllegalStateException exception = new IllegalStateException(reason); + fail(new IllegalStateException(reason)); + } + + public void fail(Exception exception) { + logger.log(Level.FINE, () -> "failed because of " + exception.getMessage()); if (exceptionListener != null) { exceptionListener.onException(exception); } - if (failed.compareAndSet(false, true)) { - latch.countDown(); - } + httpRequestFuture.fail(exception); } @Override diff --git a/src/main/java/org/xbib/netty/http/client/HttpRequestDefaults.java b/src/main/java/org/xbib/netty/http/client/HttpRequestDefaults.java index 694dbd9..66a704e 100644 --- a/src/main/java/org/xbib/netty/http/client/HttpRequestDefaults.java +++ b/src/main/java/org/xbib/netty/http/client/HttpRequestDefaults.java @@ -37,4 +37,6 @@ public interface HttpRequestDefaults { int DEFAULT_TIMEOUT_MILLIS = 5000; int DEFAULT_MAX_REDIRECT = 10; + + HttpRequestFuture DEFAULT_FUTURE = new HttpRequestFuture<>(); } diff --git a/src/main/java/org/xbib/netty/http/client/HttpRequestFuture.java b/src/main/java/org/xbib/netty/http/client/HttpRequestFuture.java new file mode 100644 index 0000000..43217eb --- /dev/null +++ b/src/main/java/org/xbib/netty/http/client/HttpRequestFuture.java @@ -0,0 +1,20 @@ +package org.xbib.netty.http.client; + +import org.xbib.netty.http.client.util.AbstractFuture; + +/** + * A HTTP request future. + * + * @param the response type parameter. + */ +public class HttpRequestFuture extends AbstractFuture { + + public void success(V v) { + set(v); + } + + public void fail(Exception e) { + setException(e); + } + +} diff --git a/src/main/java/org/xbib/netty/http/client/handler/HttpClientChannelInitializer.java b/src/main/java/org/xbib/netty/http/client/handler/HttpClientChannelInitializer.java index e168b87..7190818 100644 --- a/src/main/java/org/xbib/netty/http/client/handler/HttpClientChannelInitializer.java +++ b/src/main/java/org/xbib/netty/http/client/handler/HttpClientChannelInitializer.java @@ -142,9 +142,9 @@ public class HttpClientChannelInitializer extends ChannelInitializer + * An abstract implementation of the {@link Future} interface. This class + * is an abstraction of {@link java.util.concurrent.FutureTask} to support use + * for tasks other than {@link Runnable}s. It uses an + * {@link AbstractQueuedSynchronizer} to deal with concurrency issues and + * guarantee thread safety. It could be used as a base class to + * {@code FutureTask}, or any other implementor of the {@code Future} interface. + *

+ * + *

+ * This class implements all methods in {@code Future}. Subclasses should + * provide a way to set the result of the computation through the protected + * methods {@link #set(Object)}, {@link #setException(Exception)}, or + * {@link #cancel()}. If subclasses want to implement cancellation they can + * override the {@link #cancel(boolean)} method with a real implementation, the + * default implementation doesn't support cancellation. + *

+ * + *

+ * The state changing methods all return a boolean indicating success or + * failure in changing the future's state. Valid states are running, + * completed, failed, or cancelled. Because this class does not implement + * cancellation it is left to the subclass to distinguish between created + * and running tasks. + *

+ * + *

This class is taken from the Google Guava project.

+ * + * @param the future value parameter type + */ +public abstract class AbstractFuture implements Future { + + /** + * Synchronization control. + */ + private final Sync sync = new Sync<>(); + + /** + * The default {@link AbstractFuture} implementation throws {@code + * InterruptedException} if the current thread is interrupted before or during + * the call, even if the value is already available. + * + * @throws InterruptedException if the current thread was interrupted before + * or during the call (optional but recommended). + * @throws TimeoutException if operation timed out + * @throws ExecutionException if execution fails + */ + @Override + public V get(long timeout, TimeUnit unit) throws InterruptedException, + TimeoutException, ExecutionException { + return sync.get(unit.toNanos(timeout)); + } + + /** + * The default {@link AbstractFuture} implementation throws {@code + * InterruptedException} if the current thread is interrupted before or during + * the call, even if the value is already available. + * + * @throws InterruptedException if the current thread was interrupted before + * or during the call (optional but recommended). + * @throws ExecutionException if execution fails + */ + @Override + public V get() throws InterruptedException, ExecutionException { + return sync.get(); + } + + /** + * Checks if the sync is not in the running state. + */ + @Override + public boolean isDone() { + return sync.isDone(); + } + + /** + * Checks if the sync is in the cancelled state. + */ + @Override + public boolean isCancelled() { + return sync.isCancelled(); + } + + public boolean isSucceeded() { + return sync.isSuccess(); + } + + public boolean isFailed() { + return sync.isFailed(); + } + + /** + * Default implementation of cancel that cancels the future. + */ + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (!sync.cancel()) { + return false; + } + done(); + if (mayInterruptIfRunning) { + interruptTask(); + } + return true; + } + + /** + * Subclasses should invoke this method to set the result of the computation + * to {@code value}. This will set the state of the future to + * {@link AbstractFuture.Sync#COMPLETED} and call {@link #done()} if the + * state was successfully changed. + * + * @param value the value that was the result of the task. + * @return true if the state was successfully changed. + */ + protected boolean set(V value) { + boolean result = sync.set(value); + if (result) { + done(); + } + return result; + } + + /** + * Subclasses should invoke this method to set the result of the computation + * to an error, {@code throwable}. This will set the state of the future to + * {@link AbstractFuture.Sync#COMPLETED} and call {@link #done()} if the + * state was successfully changed. + * + * @param exception the exception that the task failed with. + * @return true if the state was successfully changed. + */ + protected boolean setException(Exception exception) { + boolean result = sync.setException(exception); + if (result) { + done(); + } + return result; + } + + /** + * Subclasses should invoke this method to mark the future as cancelled. + * This will set the state of the future to {@link + * AbstractFuture.Sync#CANCELLED} and call {@link #done()} if the state was + * successfully changed. + * + * @return true if the state was successfully changed. + */ + protected final boolean cancel() { + boolean result = sync.cancel(); + if (result) { + done(); + } + return result; + } + + /** + * Called by the success, failed, or cancelled methods to indicate that the + * value is now available and the latch can be released. Subclasses can + * use this method to deal with any actions that should be undertaken when + * the task has completed. + */ + protected void done() { + } + + /** + * Subclasses can override this method to implement interruption of the + * future's computation. The method is invoked automatically by a successful + * call to {@link #cancel(boolean) cancel(true)}. + * The default implementation does nothing. + */ + protected void interruptTask() { + } + + /** + *

+ * Following the contract of {@link AbstractQueuedSynchronizer} we create a + * private subclass to hold the synchronizer. This synchronizer is used to + * implement the blocking and waiting calls as well as to handle state changes + * in a thread-safe manner. The current state of the future is held in the + * Sync state, and the lock is released whenever the state changes to either + * {@link #COMPLETED} or {@link #CANCELLED}. + *

+ *

+ * To avoid races between threads doing release and acquire, we transition + * to the final state in two steps. One thread will successfully CAS from + * RUNNING to COMPLETING, that thread will then set the result of the + * computation, and only then transition to COMPLETED or CANCELLED. + *

+ *

+ * We don't use the integer argument passed between acquire methods so we + * pass around a -1 everywhere. + *

+ */ + static final class Sync extends AbstractQueuedSynchronizer { + + private static final long serialVersionUID = -796072460488712821L; + + static final int RUNNING = 0; + static final int COMPLETING = 1; + static final int COMPLETED = 2; + static final int CANCELLED = 4; + + private V value; + private Exception exception; + + /* + * Acquisition succeeds if the future is done, otherwise it fails. + */ + @Override + protected int tryAcquireShared(int ignored) { + return isDone() ? 1 : -1; + } + + /* + * We always allow a release to go through, this means the state has been + * successfully changed and the result is available. + */ + @Override + protected boolean tryReleaseShared(int finalState) { + setState(finalState); + return true; + } + + /** + * Blocks until the task is complete or the timeout expires. Throws a + * {@link TimeoutException} if the timer expires, otherwise behaves like + * {@link #get()}. + */ + V get(long nanos) throws TimeoutException, CancellationException, + ExecutionException, InterruptedException { + // Attempt to acquire the shared lock with a timeout. + if (!tryAcquireSharedNanos(-1, nanos)) { + throw new TimeoutException("Timeout waiting for task."); + } + return getValue(); + } + + /** + * Blocks until {@link #complete(Object, Exception, int)} has been + * successfully called. Throws a {@link CancellationException} if the task + * was cancelled, or a {@link ExecutionException} if the task completed with + * an error. + */ + V get() throws CancellationException, ExecutionException, + InterruptedException { + // Acquire the shared lock allowing interruption. + acquireSharedInterruptibly(-1); + return getValue(); + } + + /** + * Implementation of the actual value retrieval. Will return the value + * on success, an exception on failure, a cancellation on cancellation, or + * an illegal state if the synchronizer is in an invalid state. + */ + private V getValue() throws CancellationException, ExecutionException { + int state = getState(); + switch (state) { + case COMPLETED: + if (exception != null) { + throw new ExecutionException(exception); + } else { + return value; + } + case CANCELLED: + throw new CancellationException("task was cancelled"); + default: + throw new IllegalStateException("error, synchronizer in invalid state: " + state); + } + } + + /** + * Checks if the state is {@link #COMPLETED} or {@link #CANCELLED}. + */ + boolean isDone() { + return (getState() & (COMPLETED | CANCELLED)) != 0; + } + + /** + * Checks if the state is {@link #CANCELLED}. + */ + boolean isCancelled() { + return getState() == CANCELLED; + } + + boolean isSuccess() { + return value != null && getState() == COMPLETED; + } + + boolean isFailed() { + return exception != null && getState() == COMPLETED; + } + + /** + * Transition to the COMPLETED state and set the value. + */ + boolean set(V v) { + return complete(v, null, COMPLETED); + } + + /** + * Transition to the COMPLETED state and set the exception. + */ + boolean setException(Exception exception) { + return complete(null, exception, COMPLETED); + } + + /** + * Transition to the CANCELLED state. + */ + boolean cancel() { + return complete(null, null, CANCELLED); + } + + /** + * Implementation of completing a task. Either {@code v} or {@code t} will + * be set but not both. The {@code finalState} is the state to change to + * from {@link #RUNNING}. If the state is not in the RUNNING state we + * return {@code false} after waiting for the state to be set to a valid + * final state ({@link #COMPLETED} or {@link #CANCELLED}). + * + * @param v the value to set as the result of the computation. + * @param exception the exception to set as the result of the computation. + * @param finalState the state to transition to. + */ + private boolean complete(V v, Exception exception, int finalState) { + boolean doCompletion = compareAndSetState(RUNNING, COMPLETING); + if (doCompletion) { + // If this thread successfully transitioned to COMPLETING, set the value + // and exception and then release to the final state. + this.value = v; + this.exception = exception; + releaseShared(finalState); + } else if (getState() == COMPLETING) { + // If some other thread is currently completing the future, block until + // they are done so we can guarantee completion. + acquireShared(-1); + } + return doCompletion; + } + } +} diff --git a/src/test/java/org/xbib/netty/http/client/test/ElasticsearchTest.java b/src/test/java/org/xbib/netty/http/client/test/ElasticsearchTest.java index 6298d3d..49130a2 100644 --- a/src/test/java/org/xbib/netty/http/client/test/ElasticsearchTest.java +++ b/src/test/java/org/xbib/netty/http/client/test/ElasticsearchTest.java @@ -21,6 +21,7 @@ import org.xbib.netty.http.client.HttpRequestBuilder; import org.xbib.netty.http.client.HttpRequestContext; import java.io.IOException; +import java.net.ConnectException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.List; @@ -31,6 +32,8 @@ import java.util.logging.LogManager; import java.util.logging.Logger; import java.util.logging.SimpleFormatter; +import static org.junit.Assert.assertTrue; + /** */ public class ElasticsearchTest { @@ -56,7 +59,8 @@ public class ElasticsearchTest { public void testElasticsearchCreateDocument() throws Exception { HttpClient httpClient = HttpClient.builder() .build(); - HttpRequestContext requestContext = httpClient.preparePut() + try { + HttpRequestContext requestContext = httpClient.preparePut() .setURL("http://localhost:9200/test/test/1") .json("{\"text\":\"Hello World\"}") .onResponse(fullHttpResponse -> { @@ -66,15 +70,20 @@ public class ElasticsearchTest { .onException(e -> logger.log(Level.SEVERE, e.getMessage(), e)) .execute() .get(); + logger.log(Level.FINE, "took = " + requestContext.took()); + } catch (Exception exception) { + assertTrue(exception.getCause() instanceof ConnectException); + logger.log(Level.INFO, "got expected exception"); + } httpClient.close(); - logger.log(Level.FINE, "took = " + requestContext.took()); } @Test public void testElasticsearchMatchQuery() throws Exception { HttpClient httpClient = HttpClient.builder() .build(); - HttpRequestContext requestContext = httpClient.preparePost() + try { + HttpRequestContext requestContext = httpClient.preparePost() .setURL("http://localhost:9200/test/_search") .json("{\"query\":{\"match\":{\"_all\":\"Hello World\"}}}") .onResponse(fullHttpResponse -> { @@ -84,8 +93,12 @@ public class ElasticsearchTest { .onException(e -> logger.log(Level.SEVERE, e.getMessage(), e)) .execute() .get(); + logger.log(Level.FINE, "took = " + requestContext.took()); + } catch (Exception exception) { + assertTrue(exception.getCause() instanceof ConnectException); + logger.log(Level.INFO, "got expected exception"); + } httpClient.close(); - logger.log(Level.FINE, "took = " + requestContext.took()); } @Test @@ -103,9 +116,14 @@ public class ElasticsearchTest { } List responses = new ArrayList<>(); for (int i = 0; i < max; i++) { - responses.add(contexts.get(i).get()); + try { + responses.add(contexts.get(i).get()); + } catch (Exception exception) { + assertTrue(exception.getCause() instanceof ConnectException); + logger.log(Level.INFO, "got expected exception"); + } } - for (int i = 0; i < max; i++) { + for (int i = 0; i < responses.size(); i++) { logger.log(Level.FINE, "took = " + responses.get(i).took()); } httpClient.close(); diff --git a/src/test/java/org/xbib/netty/http/client/test/ExceptionTest.java b/src/test/java/org/xbib/netty/http/client/test/ExceptionTest.java index cfa5b57..130aca4 100644 --- a/src/test/java/org/xbib/netty/http/client/test/ExceptionTest.java +++ b/src/test/java/org/xbib/netty/http/client/test/ExceptionTest.java @@ -18,6 +18,7 @@ package org.xbib.netty.http.client.test; import org.junit.Test; import org.xbib.netty.http.client.HttpClient; +import java.net.ConnectException; import java.nio.charset.StandardCharsets; import java.util.logging.ConsoleHandler; import java.util.logging.Handler; @@ -26,6 +27,8 @@ import java.util.logging.LogManager; import java.util.logging.Logger; import java.util.logging.SimpleFormatter; +import static org.junit.Assert.assertTrue; + /** */ public class ExceptionTest { @@ -54,15 +57,20 @@ public class ExceptionTest { HttpClient httpClient = HttpClient.builder() .build(); - httpClient.prepareGet() - .setURL("http://localhost:1234") - .onResponse(fullHttpResponse -> { - String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8); - logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response); - }) - .onException(e -> logger.log(Level.SEVERE, e.getMessage(), e)) - .execute() - .get(); + try { + httpClient.prepareGet() + .setURL("http://localhost:1234") + .onResponse(fullHttpResponse -> { + String response = fullHttpResponse.content().toString(StandardCharsets.UTF_8); + logger.log(Level.INFO, "status = " + fullHttpResponse.status() + " response body = " + response); + }) + .onException(e -> logger.log(Level.SEVERE, e.getMessage(), e)) + .execute() + .get(); + } catch (Exception exception) { + assertTrue(exception.getCause() instanceof ConnectException); + logger.log(Level.INFO, "got expected exception"); + } httpClient.close(); } } diff --git a/src/test/java/org/xbib/netty/http/client/test/IndexHbzTest.java b/src/test/java/org/xbib/netty/http/client/test/IndexHbzTest.java index 580f83b..d21da8c 100644 --- a/src/test/java/org/xbib/netty/http/client/test/IndexHbzTest.java +++ b/src/test/java/org/xbib/netty/http/client/test/IndexHbzTest.java @@ -52,6 +52,7 @@ public class IndexHbzTest { private static final Logger logger = Logger.getLogger(""); + @Test public void testIndexHbz() throws Exception { HttpClient httpClient = HttpClient.builder() .build();