update to netty 4.1.65, tcnative to 2.0.39, add some wait time for threadpools, add kqueue/epoll to tests

This commit is contained in:
Jörg Prante 2021-05-21 22:26:42 +02:00
parent c537e8b6d4
commit 1b84d5276b
8 changed files with 129 additions and 22 deletions

View file

@ -1,16 +1,16 @@
group = org.xbib
name = netty-http
version = 4.1.63.4
version = 4.1.65.0
gradle.wrapper.version = 6.6.1
netty.version = 4.1.63.Final
tcnative.version = 2.0.38.Final
netty.version = 4.1.65.Final
tcnative.version = 2.0.39.Final
bouncycastle.version = 1.68
reactivestreams.version = 1.0.2
reactivestreams.version = 1.0.3
reactivex.version = 1.3.8
conscrypt.version = 2.5.1
javassist.version = 3.27.0-GA
conscrypt.version = 2.5.2
javassist.version = 3.28.0-GA
jackson.version = 2.11.4
mockito.version = 3.10.0
xbib.net.version = 2.1.1

View file

@ -12,6 +12,7 @@ dependencies {
test {
useJUnitPlatform()
failFast = true
maxHeapSize '1g'
systemProperty 'java.util.logging.config.file', 'src/test/resources/logging.properties'
testLogging {
events 'STARTED', 'PASSED', 'FAILED', 'SKIPPED'

View file

@ -34,12 +34,12 @@ class PoolTest {
private static final Logger logger = Logger.getLogger(PoolTest.class.getName());
private static final int TEST_STEP_TIME_SECONDS = 50;
private static final long TEST_STEP_TIME_SECONDS = 60L;
private static final int BATCH_SIZE = 0x100;
private static final int BATCH_SIZE = 100;
@ParameterizedTest
@ValueSource(ints = {1,10,100})
@ValueSource(ints = {1,10,25})
void testPool(int concurrencyLevel) throws InterruptedException {
ConcurrentMap<HttpAddress, LongAdder> nodeFreq = new ConcurrentHashMap<>();
int nodecount = 2;

View file

@ -7,4 +7,6 @@ dependencies {
testRuntimeOnly "org.javassist:javassist:${project.property('javassist.version')}"
testRuntimeOnly "org.bouncycastle:bcpkix-jdk15on:${project.property('bouncycastle.version')}"
testRuntimeOnly "io.netty:netty-tcnative-boringssl-static:${project.property('tcnative.version')}"
testRuntimeOnly project(":netty-http-epoll")
testRuntimeOnly project(":netty-http-kqueue")
}

View file

@ -29,6 +29,7 @@ import org.xbib.netty.http.server.endpoint.HttpEndpointResolver;
import org.xbib.netty.http.server.security.CertificateUtils;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.BindException;
import java.security.cert.CertificateExpiredException;
import java.security.cert.CertificateNotYetValidException;
import java.security.cert.CertificateParsingException;
@ -183,18 +184,26 @@ public final class Server implements AutoCloseable {
/**
* Start accepting incoming connections.
* @return the channel future
* @throws IOException if channel future sync is interrupted
* @throws BindException if socket bind did not succeed
*/
public ChannelFuture accept() throws IOException {
public ChannelFuture accept() throws BindException {
try {
HttpAddress httpAddress = serverConfig.getAddress();
logger.log(Level.INFO, () -> "trying to bind to " + httpAddress);
try {
this.channelFuture = bootstrap.bind(httpAddress.getInetSocketAddress()).await().sync();
} catch (InterruptedException e) {
throw new IOException(e);
throw new BindException(e.getMessage());
}
logger.log(Level.INFO, () -> ServerName.getServerName() + " ready, listening on " + httpAddress);
return channelFuture;
} catch (Exception e) {
if (e instanceof BindException) {
throw e;
} else {
throw new BindException(e.getMessage());
}
}
}
public AtomicLong getRequestCounter() {

View file

@ -23,7 +23,6 @@ class BindExceptionTest {
Server server1 = Server.builder(domain).build();
Server server2 = Server.builder(domain).build();
try {
// ATTN: when using native libraries (epoll), the bind exception will be an internal error
Assertions.assertThrows(BindException.class, () ->{
ChannelFuture channelFuture1 = server1.accept();
assertNotNull(channelFuture1);

View file

@ -169,6 +169,7 @@ class CleartextTest {
}
});
}
Thread.sleep(5000L);
executorService.shutdown();
boolean terminated = executorService.awaitTermination(30L, TimeUnit.SECONDS);
executorService.shutdownNow();
@ -176,8 +177,8 @@ class CleartextTest {
transport.get(30L, TimeUnit.SECONDS);
logger.log(Level.INFO, "transport complete");
} finally {
client.shutdownGracefully();
server.shutdownGracefully();
client.shutdownGracefully();
}
logger.log(Level.INFO, "client requests = " + client.getRequestCounter() +
" client responses = " + client.getResponseCounter());
@ -255,6 +256,7 @@ class CleartextTest {
}
});
}
Thread.sleep(5000L);
executorService.shutdown();
boolean terminated = executorService.awaitTermination(30L, TimeUnit.SECONDS);
logger.log(Level.INFO, "terminated = " + terminated + ", now waiting for transport to complete");

View file

@ -98,7 +98,8 @@ class EncryptedTest {
ClientTransport transport = client.newTransport();
for (int i = 0; i < loop; i++) {
String payload = 0 + "/" + i;
Request request = Request.get().setVersion("HTTP/2.0")
Request request = Request.get()
.setVersion("HTTP/2.0")
.url(server.getServerConfig().getAddress().base())
.content(payload, "text/plain")
.setResponseListener(responseListener)
@ -149,7 +150,8 @@ class EncryptedTest {
try {
for (int i = 0; i < loop; i++) {
String payload = t + "/" + i;
Request request = Request.get().setVersion("HTTP/2.0")
Request request = Request.get()
.setVersion("HTTP/2.0")
.url(server.getServerConfig().getAddress().base())
.content(payload, "text/plain")
.setResponseListener(responseListener)
@ -165,6 +167,7 @@ class EncryptedTest {
}
});
}
Thread.sleep(5000L);
executorService.shutdown();
boolean terminated = executorService.awaitTermination(30L, TimeUnit.SECONDS);
executorService.shutdownNow();
@ -176,4 +179,95 @@ class EncryptedTest {
}
assertEquals(threads * loop , counter.get());
}
@Test
void testTwoPooledSecureHttp2() throws Exception {
int threads = 4;
int loop = 1024;
HttpAddress httpAddress1 = HttpAddress.secureHttp2("localhost", 8143);
AtomicInteger counter1 = new AtomicInteger();
HttpServerDomain domain1 = HttpServerDomain.builder(httpAddress1)
.setSelfCert()
.singleEndpoint("/", (request, response) -> {
response.getBuilder().setStatus(HttpResponseStatus.OK.code()).setContentType("text/plain").build()
.write(request.getContent().toString(StandardCharsets.UTF_8));
counter1.incrementAndGet();
})
.build();
Server server1 = Server.builder(domain1)
.build();
server1.accept();
HttpAddress httpAddress2 = HttpAddress.secureHttp2("localhost", 8144);
AtomicInteger counter2 = new AtomicInteger();
HttpServerDomain domain2 = HttpServerDomain.builder(httpAddress2)
.setSelfCert()
.singleEndpoint("/", (request, response) -> {
response.getBuilder().setStatus(HttpResponseStatus.OK.code()).setContentType("text/plain").build()
.write(request.getContent().toString(StandardCharsets.UTF_8));
counter2.incrementAndGet();
})
.build();
Server server2 = Server.builder(domain2)
.build();
server2.accept();
Client client = Client.builder()
.trustInsecure()
.addPoolNode(httpAddress1)
.addPoolNode(httpAddress2)
.setPoolNodeConnectionLimit(threads)
.build();
AtomicInteger counter = new AtomicInteger();
// a single instance of HTTP/2 response listener, always receives responses out-of-order
final ResponseListener<HttpResponse> responseListener = resp -> {
if (resp.getStatus().getCode() == HttpResponseStatus.OK.code()) {
counter.incrementAndGet();
} else {
logger.log(Level.INFO, "response listener: headers = " + resp.getHeaders() +
" response body = " + resp.getBodyAsString(StandardCharsets.UTF_8));
}
};
try {
// note: for HTTP/2 only, we can use a single shared transport
final ClientTransport transport = client.newTransport();
ExecutorService executorService = Executors.newFixedThreadPool(threads);
for (int n = 0; n < threads; n++) {
final int t = n;
executorService.submit(() -> {
try {
for (int i = 0; i < loop; i++) {
String payload = t + "/" + i;
// note that we do not set url() in the request
Request request = Request.get()
.setVersion("HTTP/2.0")
.content(payload, "text/plain")
.setResponseListener(responseListener)
.build();
transport.execute(request);
if (transport.isFailed()) {
logger.log(Level.WARNING, transport.getFailure().getMessage(), transport.getFailure());
break;
}
}
} catch (Throwable e) {
logger.log(Level.WARNING, e.getMessage(), e);
}
});
}
Thread.sleep(5000L);
executorService.shutdown();
boolean terminated = executorService.awaitTermination(30L, TimeUnit.SECONDS);
logger.log(Level.INFO, "terminated = " + terminated + ", now waiting for transport to complete");
transport.get(30L, TimeUnit.SECONDS);
logger.log(Level.INFO, "transport complete");
} finally {
client.shutdownGracefully();
server1.shutdownGracefully();
server2.shutdownGracefully();
}
logger.log(Level.INFO, "client requests = " + client.getRequestCounter() +
" client responses = " + client.getResponseCounter());
logger.log(Level.INFO, "counter1=" + counter1.get() + " counter2=" + counter2.get());
logger.log(Level.INFO, "expecting=" + threads * loop + " counter=" + counter.get());
assertEquals(threads * loop, counter.get());
}
}