better GOAWAY handling

This commit is contained in:
Jörg Prante 2024-10-10 10:31:45 +02:00
parent ab20035583
commit 3d5e2bc26a
5 changed files with 55 additions and 29 deletions

View file

@ -1,3 +1,3 @@
group = org.xbib
name = oai
version = 4.0.0
version = 4.0.1

View file

@ -26,6 +26,7 @@ test {
systemProperty 'io.netty.recycler.maxCapacityPerThread', '0'
systemProperty 'io.netty.transport.noNative', 'true'
testLogging {
showStandardStreams = true
events 'STARTED', 'PASSED', 'FAILED', 'SKIPPED'
}
afterSuite { desc, result ->

View file

@ -198,17 +198,31 @@ public class OAIClient {
.build();
logger.log(Level.INFO, "sending " + httpRequest);
if (consumer != null) {
HttpResponse<byte[]> httpResponse = httpClient.send(httpRequest, HttpResponse.BodyHandlers.ofByteArray());
int status = httpResponse.statusCode();
String contentType = httpResponse.headers().firstValue("content-type").orElse(null);
String retryAfter = httpResponse.headers().firstValue("retry-after").orElse(null);
String body = new String(httpResponse.body(), StandardCharsets.UTF_8);
listRecordsResponse.receivedResponse(body, status, contentType, retryAfter, splitWriter);
logger.log(Level.FINE, "response headers = " + httpResponse.headers() +
" resumption-token = " + listRecordsResponse.getResumptionToken());
byte[] b = httpResponse.body();
if (b.length > 0) {
consumer.accept(new ByteArrayInputStream(b));
int retrycount = 3;
HttpResponse<byte[]> httpResponse = null;
while (httpResponse == null && retrycount > 0) {
try {
httpResponse = httpClient.send(httpRequest, HttpResponse.BodyHandlers.ofByteArray());
} catch (IOException e) {
// e.g. java.io.IOException: GOAWAY received
// at java.net.http/jdk.internal.net.http.HttpClientImpl.send(Unknown Source)
logger.log(Level.WARNING, e.getMessage(), e);
retrycount--;
Thread.sleep(5000L);
}
}
if (httpResponse != null) {
int status = httpResponse.statusCode();
String contentType = httpResponse.headers().firstValue("content-type").orElse(null);
String retryAfter = httpResponse.headers().firstValue("retry-after").orElse(null);
String body = new String(httpResponse.body(), StandardCharsets.UTF_8);
listRecordsResponse.receivedResponse(body, status, contentType, retryAfter, splitWriter);
logger.log(Level.FINE, "response headers = " + httpResponse.headers() +
" resumption-token = " + listRecordsResponse.getResumptionToken());
byte[] b = httpResponse.body();
if (b.length > 0) {
consumer.accept(new ByteArrayInputStream(b));
}
}
} else {
HttpResponse<String> httpResponse = httpClient.send(httpRequest, HttpResponse.BodyHandlers.ofString());

View file

@ -18,6 +18,8 @@ import java.io.StringWriter;
import java.time.Instant;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.xml.transform.Source;
import javax.xml.transform.Transformer;
@ -28,6 +30,8 @@ import javax.xml.transform.stream.StreamResult;
public class ListRecordsResponse implements OAIResponse {
private static final Logger logger = Logger.getLogger(ListRecordsResponse.class.getName());
private final ListRecordsRequest request;
private ListRecordsFilterReader filterreader;
@ -54,35 +58,44 @@ public class ListRecordsResponse implements OAIResponse {
return error;
}
public void receivedResponse(String message, int status, String contentType, String retryAfter, SplitWriter splitWriter) throws OAIException {
if (status == 503) {
long secs = retryAfterMillis / 1000;
if (retryAfter != null) {
public void receivedResponse(String message,
int status,
String contentType,
String retryAfter,
SplitWriter splitWriter) throws OAIException {
long secs = retryAfterMillis / 1000;
if (retryAfter != null) {
if (!isDigits(retryAfter)) {
// parse RFC date, e.g. Fri, 31 Dec 1999 23:59:59 GMT
Instant instant = Instant.from(DateTimeFormatter.RFC_1123_DATE_TIME.parse(retryAfter));
secs = ChronoUnit.SECONDS.between(instant, Instant.now());
} else {
secs = Long.parseLong(retryAfter);
if (!isDigits(retryAfter)) {
// parse RFC date, e.g. Fri, 31 Dec 1999 23:59:59 GMT
Instant instant = Instant.from(DateTimeFormatter.RFC_1123_DATE_TIME.parse(retryAfter));
secs = ChronoUnit.SECONDS.between(instant, Instant.now());
}
}
request.setRetry(true);
}
if (status == 503) {
try {
if (secs > 0L) {
logger.log(Level.INFO, "waiting " + secs + " seconds");
Thread.sleep(1000 * secs);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// early return
return;
}
if (status == 429) {
} else if (status == 429) {
try {
Thread.sleep(10000L);
if (secs > 0L) {
logger.log(Level.INFO, "waiting " + secs + " seconds");
Thread.sleep(1000 * secs);
}
} catch (InterruptedException e) {
// ignore
Thread.currentThread().interrupt();
}
}
if (status != 200) {
// do not return, try to evaluate returned message if present
} else if (status != 200) {
throw new OAIException("status = " + status + " response = " + message);
}
// activate XSLT only if OAI XML content type is returned
@ -138,5 +151,4 @@ public class ListRecordsResponse implements OAIResponse {
public ResumptionToken<?> getResumptionToken() {
return filterreader != null ? filterreader.getResumptionToken() : null;
}
}

View file

@ -7,7 +7,6 @@ import org.xbib.oai.client.SplitWriter;
import java.time.Instant;
@Disabled("port is locked")
class DNBClientTest {
@Test