add exception listener und timeout listener to request

This commit is contained in:
Jörg Prante 2021-06-15 11:57:49 +02:00
parent 1b84d5276b
commit 1e2e25279c
7 changed files with 110 additions and 16 deletions

View file

@ -1,6 +1,6 @@
group = org.xbib
name = netty-http
version = 4.1.65.0
version = 4.1.65.1
gradle.wrapper.version = 6.6.1

View file

@ -20,6 +20,7 @@ import org.xbib.netty.http.common.HttpResponse;
import org.xbib.netty.http.common.cookie.Cookie;
import org.xbib.netty.http.common.util.CaseInsensitiveParameters;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.ZoneOffset;
@ -39,7 +40,7 @@ import java.util.concurrent.CompletableFuture;
/**
* HTTP client request.
*/
public final class Request {
public final class Request implements AutoCloseable {
private final URL url;
@ -71,10 +72,16 @@ public final class Request {
private ResponseListener<HttpResponse> responseListener;
private ExceptionListener exceptionListener;
private TimeoutListener timeoutListener;
private Request(URL url, HttpVersion httpVersion, HttpMethod httpMethod,
HttpHeaders headers, Collection<Cookie> cookies, ByteBuf content, List<InterfaceHttpData> bodyData,
long timeoutInMillis, boolean followRedirect, int maxRedirect, int redirectCount,
boolean isBackOff, BackOff backOff, ResponseListener<HttpResponse> responseListener) {
boolean isBackOff, BackOff backOff,
ResponseListener<HttpResponse> responseListener, ExceptionListener exceptionListener,
TimeoutListener timeoutListener) {
this.url = url;
this.httpVersion = httpVersion;
this.httpMethod = httpMethod;
@ -89,6 +96,8 @@ public final class Request {
this.isBackOff = isBackOff;
this.backOff = backOff;
this.responseListener = responseListener;
this.exceptionListener = exceptionListener;
this.timeoutListener = timeoutListener;
}
public URL url() {
@ -165,6 +174,11 @@ public final class Request {
}
}
@Override
public void close() throws IOException {
release();
}
@Override
public String toString() {
return "Request[url=" + url +
@ -199,6 +213,26 @@ public final class Request {
}
}
public void setExceptionListener(ExceptionListener exceptionListener) {
this.exceptionListener = exceptionListener;
}
public void onException(Throwable throwable) {
if (exceptionListener != null) {
exceptionListener.onException(throwable);
}
}
public void setTimeoutListener(TimeoutListener timeoutListener) {
this.timeoutListener = timeoutListener;
}
public void onTimeout() {
if (timeoutListener != null) {
timeoutListener.onTimeout(this);
}
}
public static Builder get() {
return builder(HttpMethod.GET);
}
@ -318,6 +352,10 @@ public final class Request {
private ResponseListener<HttpResponse> responseListener;
private ExceptionListener exceptionListener;
private TimeoutListener timeoutListener;
Builder(ByteBufAllocator allocator) {
this.allocator = allocator;
this.httpMethod = DEFAULT_METHOD;
@ -574,6 +612,16 @@ public final class Request {
return this;
}
public Builder setExceptionListener(ExceptionListener exceptionListener) {
this.exceptionListener = exceptionListener;
return this;
}
public Builder setTimeoutListener(TimeoutListener timeoutListener) {
this.timeoutListener = timeoutListener;
return this;
}
public Request build() {
DefaultHttpHeaders validatedHeaders = new DefaultHttpHeaders(true);
validatedHeaders.set(headers);
@ -622,7 +670,7 @@ public final class Request {
}
return new Request(url, httpVersion, httpMethod, validatedHeaders, cookies, content, bodyData,
timeoutInMillis, followRedirect, maxRedirects, 0, enableBackOff, backOff,
responseListener);
responseListener, exceptionListener, timeoutListener);
}
private void addHeader(AsciiString name, Object value) {

View file

@ -0,0 +1,7 @@
package org.xbib.netty.http.client.api;
@FunctionalInterface
public interface TimeoutListener {
void onTimeout(Request request);
}

View file

@ -32,6 +32,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -76,7 +77,7 @@ public abstract class BaseTransport implements ClientTransport {
}
/**
* Method for executing in a wrapping completable future.
* Method for executing the request and respond in a completable future.
*
* @param request request
* @param supplier supplier
@ -96,6 +97,8 @@ public abstract class BaseTransport implements ClientTransport {
}
close();
});
request.setTimeoutListener(req -> completableFuture.completeExceptionally(new TimeoutException()));
request.setExceptionListener(completableFuture::completeExceptionally);
execute(request);
return completableFuture;
}
@ -153,8 +156,15 @@ public abstract class BaseTransport implements ClientTransport {
for (Integer key : flow.keys()) {
String requestKey = getRequestKey(entry.getKey(), key);
try {
flow.get(key).get(value, timeUnit);
completeRequest(requestKey);
CompletableFuture<Boolean> timeoutFuture = flow.get(key);
Boolean timeout = timeoutFuture.get(value, timeUnit);
if (timeout) {
completeRequest(requestKey);
} else {
completeRequestTimeout(requestKey, new TimeoutException());
}
} catch (TimeoutException e) {
completeRequestTimeout(requestKey, new TimeoutException());
} catch (Exception e) {
completeRequestExceptionally(requestKey, e);
flow.fail(e);
@ -360,8 +370,17 @@ public abstract class BaseTransport implements ClientTransport {
private void completeRequestExceptionally(String requestKey, Throwable throwable) {
if (requestKey != null) {
Request request = requests.get(requestKey);
if (request != null && request.getCompletableFuture() != null) {
request.getCompletableFuture().completeExceptionally(throwable);
if (request != null) {
request.onException(throwable);
}
}
}
private void completeRequestTimeout(String requestKey, TimeoutException timeoutException) {
if (requestKey != null) {
Request request = requests.get(requestKey);
if (request != null) {
request.onTimeout();
}
}
}

View file

@ -82,6 +82,25 @@ class Http1Test {
}
}
@Test
void testHttpsGetRequestHebisSRU() throws Exception {
Client client = Client.builder()
.enableDebug()
.build();
try {
Request request = Request.get()
.url("http://sru.hebis.de/sru/DB=2.1?version=1.1&operation=searchRetrieve&recordSchema=marc21&query=prs%20=%20Smith&startRecord=1&maximumRecords=10")
.setResponseListener(resp -> logger.log(Level.INFO,
"got response: " + resp.getHeaders() +
resp.getBodyAsString(StandardCharsets.UTF_8) +
" status=" + resp.getStatus()))
.build();
client.execute(request).get().close();
} finally {
client.shutdownGracefully();
}
}
@Test
void testSequentialRequests() throws Exception {

View file

@ -20,15 +20,15 @@ class XbibTest {
@Test
void testXbibOrgWithDefaults() throws IOException {
Client client = new Client();
Client client = Client.builder().build();
try {
Request request = Request.get().url("https://xbib.org")
.setResponseListener(resp -> {
logger.log(Level.INFO, "status = " + resp.getStatus() +
" response = " + resp.getBodyAsString(StandardCharsets.UTF_8));
})
.setResponseListener(resp -> logger.log(Level.INFO, "status = " + resp.getStatus() +
" response = " + resp.getBodyAsString(StandardCharsets.UTF_8)))
.setTimeoutListener(req -> logger.log(Level.WARNING, "timeout!"))
.setExceptionListener(throwable -> logger.log(Level.SEVERE, throwable.getMessage(), throwable))
.build();
client.execute(request);
client.execute(request).close();
} finally {
client.shutdownGracefully();
}

View file

@ -38,7 +38,8 @@ class CleartextTest {
.setContentType("text/plain").build()
.write(request.getContent().toString(StandardCharsets.UTF_8)))
.build();
Server server = Server.builder(domain).build();
Server server = Server.builder(domain)
.build();
server.accept();
Client client = Client.builder()
.build();