diff --git a/gradle.properties b/gradle.properties index 5772eeb..c792015 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,16 +1,16 @@ group = org.xbib name = netty-http -version = 4.1.63.4 +version = 4.1.65.0 gradle.wrapper.version = 6.6.1 -netty.version = 4.1.63.Final -tcnative.version = 2.0.38.Final +netty.version = 4.1.65.Final +tcnative.version = 2.0.39.Final bouncycastle.version = 1.68 -reactivestreams.version = 1.0.2 +reactivestreams.version = 1.0.3 reactivex.version = 1.3.8 -conscrypt.version = 2.5.1 -javassist.version = 3.27.0-GA +conscrypt.version = 2.5.2 +javassist.version = 3.28.0-GA jackson.version = 2.11.4 mockito.version = 3.10.0 xbib.net.version = 2.1.1 diff --git a/gradle/test/junit5.gradle b/gradle/test/junit5.gradle index f1deffa..9777b6a 100644 --- a/gradle/test/junit5.gradle +++ b/gradle/test/junit5.gradle @@ -12,6 +12,7 @@ dependencies { test { useJUnitPlatform() failFast = true + maxHeapSize '1g' systemProperty 'java.util.logging.config.file', 'src/test/resources/logging.properties' testLogging { events 'STARTED', 'PASSED', 'FAILED', 'SKIPPED' diff --git a/netty-http-client/src/test/java/org/xbib/netty/http/client/test/pool/PoolTest.java b/netty-http-client/src/test/java/org/xbib/netty/http/client/test/pool/PoolTest.java index e05dbff..8637742 100644 --- a/netty-http-client/src/test/java/org/xbib/netty/http/client/test/pool/PoolTest.java +++ b/netty-http-client/src/test/java/org/xbib/netty/http/client/test/pool/PoolTest.java @@ -34,12 +34,12 @@ class PoolTest { private static final Logger logger = Logger.getLogger(PoolTest.class.getName()); - private static final int TEST_STEP_TIME_SECONDS = 50; + private static final long TEST_STEP_TIME_SECONDS = 60L; - private static final int BATCH_SIZE = 0x100; + private static final int BATCH_SIZE = 100; @ParameterizedTest - @ValueSource(ints = {1,10,100}) + @ValueSource(ints = {1,10,25}) void testPool(int concurrencyLevel) throws InterruptedException { ConcurrentMap nodeFreq = new ConcurrentHashMap<>(); int nodecount = 2; diff --git a/netty-http-server/build.gradle b/netty-http-server/build.gradle index db603ee..2ba5c1c 100644 --- a/netty-http-server/build.gradle +++ b/netty-http-server/build.gradle @@ -7,4 +7,6 @@ dependencies { testRuntimeOnly "org.javassist:javassist:${project.property('javassist.version')}" testRuntimeOnly "org.bouncycastle:bcpkix-jdk15on:${project.property('bouncycastle.version')}" testRuntimeOnly "io.netty:netty-tcnative-boringssl-static:${project.property('tcnative.version')}" + testRuntimeOnly project(":netty-http-epoll") + testRuntimeOnly project(":netty-http-kqueue") } diff --git a/netty-http-server/src/main/java/org/xbib/netty/http/server/Server.java b/netty-http-server/src/main/java/org/xbib/netty/http/server/Server.java index 2068642..9902bb5 100644 --- a/netty-http-server/src/main/java/org/xbib/netty/http/server/Server.java +++ b/netty-http-server/src/main/java/org/xbib/netty/http/server/Server.java @@ -29,6 +29,7 @@ import org.xbib.netty.http.server.endpoint.HttpEndpointResolver; import org.xbib.netty.http.server.security.CertificateUtils; import java.io.IOException; import java.lang.reflect.InvocationTargetException; +import java.net.BindException; import java.security.cert.CertificateExpiredException; import java.security.cert.CertificateNotYetValidException; import java.security.cert.CertificateParsingException; @@ -183,18 +184,26 @@ public final class Server implements AutoCloseable { /** * Start accepting incoming connections. * @return the channel future - * @throws IOException if channel future sync is interrupted + * @throws BindException if socket bind did not succeed */ - public ChannelFuture accept() throws IOException { - HttpAddress httpAddress = serverConfig.getAddress(); - logger.log(Level.INFO, () -> "trying to bind to " + httpAddress); + public ChannelFuture accept() throws BindException { try { - this.channelFuture = bootstrap.bind(httpAddress.getInetSocketAddress()).await().sync(); - } catch (InterruptedException e) { - throw new IOException(e); + HttpAddress httpAddress = serverConfig.getAddress(); + logger.log(Level.INFO, () -> "trying to bind to " + httpAddress); + try { + this.channelFuture = bootstrap.bind(httpAddress.getInetSocketAddress()).await().sync(); + } catch (InterruptedException e) { + throw new BindException(e.getMessage()); + } + logger.log(Level.INFO, () -> ServerName.getServerName() + " ready, listening on " + httpAddress); + return channelFuture; + } catch (Exception e) { + if (e instanceof BindException) { + throw e; + } else { + throw new BindException(e.getMessage()); + } } - logger.log(Level.INFO, () -> ServerName.getServerName() + " ready, listening on " + httpAddress); - return channelFuture; } public AtomicLong getRequestCounter() { diff --git a/netty-http-server/src/test/java/org/xbib/netty/http/server/test/BindExceptionTest.java b/netty-http-server/src/test/java/org/xbib/netty/http/server/test/BindExceptionTest.java index 96eb847..a794767 100644 --- a/netty-http-server/src/test/java/org/xbib/netty/http/server/test/BindExceptionTest.java +++ b/netty-http-server/src/test/java/org/xbib/netty/http/server/test/BindExceptionTest.java @@ -23,7 +23,6 @@ class BindExceptionTest { Server server1 = Server.builder(domain).build(); Server server2 = Server.builder(domain).build(); try { - // ATTN: when using native libraries (epoll), the bind exception will be an internal error Assertions.assertThrows(BindException.class, () ->{ ChannelFuture channelFuture1 = server1.accept(); assertNotNull(channelFuture1); diff --git a/netty-http-server/src/test/java/org/xbib/netty/http/server/test/http2/CleartextTest.java b/netty-http-server/src/test/java/org/xbib/netty/http/server/test/http2/CleartextTest.java index 3dc8ef0..9305ebe 100644 --- a/netty-http-server/src/test/java/org/xbib/netty/http/server/test/http2/CleartextTest.java +++ b/netty-http-server/src/test/java/org/xbib/netty/http/server/test/http2/CleartextTest.java @@ -169,6 +169,7 @@ class CleartextTest { } }); } + Thread.sleep(5000L); executorService.shutdown(); boolean terminated = executorService.awaitTermination(30L, TimeUnit.SECONDS); executorService.shutdownNow(); @@ -176,8 +177,8 @@ class CleartextTest { transport.get(30L, TimeUnit.SECONDS); logger.log(Level.INFO, "transport complete"); } finally { - client.shutdownGracefully(); server.shutdownGracefully(); + client.shutdownGracefully(); } logger.log(Level.INFO, "client requests = " + client.getRequestCounter() + " client responses = " + client.getResponseCounter()); @@ -255,6 +256,7 @@ class CleartextTest { } }); } + Thread.sleep(5000L); executorService.shutdown(); boolean terminated = executorService.awaitTermination(30L, TimeUnit.SECONDS); logger.log(Level.INFO, "terminated = " + terminated + ", now waiting for transport to complete"); diff --git a/netty-http-server/src/test/java/org/xbib/netty/http/server/test/http2/EncryptedTest.java b/netty-http-server/src/test/java/org/xbib/netty/http/server/test/http2/EncryptedTest.java index 26bde68..f5a01e0 100644 --- a/netty-http-server/src/test/java/org/xbib/netty/http/server/test/http2/EncryptedTest.java +++ b/netty-http-server/src/test/java/org/xbib/netty/http/server/test/http2/EncryptedTest.java @@ -98,7 +98,8 @@ class EncryptedTest { ClientTransport transport = client.newTransport(); for (int i = 0; i < loop; i++) { String payload = 0 + "/" + i; - Request request = Request.get().setVersion("HTTP/2.0") + Request request = Request.get() + .setVersion("HTTP/2.0") .url(server.getServerConfig().getAddress().base()) .content(payload, "text/plain") .setResponseListener(responseListener) @@ -149,7 +150,8 @@ class EncryptedTest { try { for (int i = 0; i < loop; i++) { String payload = t + "/" + i; - Request request = Request.get().setVersion("HTTP/2.0") + Request request = Request.get() + .setVersion("HTTP/2.0") .url(server.getServerConfig().getAddress().base()) .content(payload, "text/plain") .setResponseListener(responseListener) @@ -165,6 +167,7 @@ class EncryptedTest { } }); } + Thread.sleep(5000L); executorService.shutdown(); boolean terminated = executorService.awaitTermination(30L, TimeUnit.SECONDS); executorService.shutdownNow(); @@ -176,4 +179,95 @@ class EncryptedTest { } assertEquals(threads * loop , counter.get()); } + + @Test + void testTwoPooledSecureHttp2() throws Exception { + int threads = 4; + int loop = 1024; + HttpAddress httpAddress1 = HttpAddress.secureHttp2("localhost", 8143); + AtomicInteger counter1 = new AtomicInteger(); + HttpServerDomain domain1 = HttpServerDomain.builder(httpAddress1) + .setSelfCert() + .singleEndpoint("/", (request, response) -> { + response.getBuilder().setStatus(HttpResponseStatus.OK.code()).setContentType("text/plain").build() + .write(request.getContent().toString(StandardCharsets.UTF_8)); + counter1.incrementAndGet(); + }) + .build(); + Server server1 = Server.builder(domain1) + .build(); + server1.accept(); + HttpAddress httpAddress2 = HttpAddress.secureHttp2("localhost", 8144); + AtomicInteger counter2 = new AtomicInteger(); + HttpServerDomain domain2 = HttpServerDomain.builder(httpAddress2) + .setSelfCert() + .singleEndpoint("/", (request, response) -> { + response.getBuilder().setStatus(HttpResponseStatus.OK.code()).setContentType("text/plain").build() + .write(request.getContent().toString(StandardCharsets.UTF_8)); + counter2.incrementAndGet(); + }) + .build(); + Server server2 = Server.builder(domain2) + .build(); + server2.accept(); + Client client = Client.builder() + .trustInsecure() + .addPoolNode(httpAddress1) + .addPoolNode(httpAddress2) + .setPoolNodeConnectionLimit(threads) + .build(); + AtomicInteger counter = new AtomicInteger(); + // a single instance of HTTP/2 response listener, always receives responses out-of-order + final ResponseListener responseListener = resp -> { + if (resp.getStatus().getCode() == HttpResponseStatus.OK.code()) { + counter.incrementAndGet(); + } else { + logger.log(Level.INFO, "response listener: headers = " + resp.getHeaders() + + " response body = " + resp.getBodyAsString(StandardCharsets.UTF_8)); + } + }; + try { + // note: for HTTP/2 only, we can use a single shared transport + final ClientTransport transport = client.newTransport(); + ExecutorService executorService = Executors.newFixedThreadPool(threads); + for (int n = 0; n < threads; n++) { + final int t = n; + executorService.submit(() -> { + try { + for (int i = 0; i < loop; i++) { + String payload = t + "/" + i; + // note that we do not set url() in the request + Request request = Request.get() + .setVersion("HTTP/2.0") + .content(payload, "text/plain") + .setResponseListener(responseListener) + .build(); + transport.execute(request); + if (transport.isFailed()) { + logger.log(Level.WARNING, transport.getFailure().getMessage(), transport.getFailure()); + break; + } + } + } catch (Throwable e) { + logger.log(Level.WARNING, e.getMessage(), e); + } + }); + } + Thread.sleep(5000L); + executorService.shutdown(); + boolean terminated = executorService.awaitTermination(30L, TimeUnit.SECONDS); + logger.log(Level.INFO, "terminated = " + terminated + ", now waiting for transport to complete"); + transport.get(30L, TimeUnit.SECONDS); + logger.log(Level.INFO, "transport complete"); + } finally { + client.shutdownGracefully(); + server1.shutdownGracefully(); + server2.shutdownGracefully(); + } + logger.log(Level.INFO, "client requests = " + client.getRequestCounter() + + " client responses = " + client.getResponseCounter()); + logger.log(Level.INFO, "counter1=" + counter1.get() + " counter2=" + counter2.get()); + logger.log(Level.INFO, "expecting=" + threads * loop + " counter=" + counter.get()); + assertEquals(threads * loop, counter.get()); + } }