split bouncycastle, epoll, kqueue into separate modules, downgrade tcnative to macos 10.12 compatibility

This commit is contained in:
Jörg Prante 2019-12-22 02:03:19 +01:00
parent 5490eec9f8
commit aff174ab22
29 changed files with 314 additions and 325 deletions

View file

@ -38,7 +38,9 @@ subprojects {
jar { jar {
manifest { manifest {
attributes('Implementation-Title': project.name)
attributes('Implementation-Version': project.version) attributes('Implementation-Version': project.version)
attributes('Implementation-Vendor': 'Jörg Prante')
} }
} }

View file

@ -1,10 +1,11 @@
group = org.xbib group = org.xbib
name = netty-http name = netty-http
version = 4.1.43.1 version = 4.1.44.0
# netty # netty
netty.version = 4.1.43.Final netty.version = 4.1.44.Final
tcnative.version = 2.0.25.Final tcnative.version = 2.0.28.Final
tcnative-legacy-macosx.version = 2.0.26.Final
# for netty-http-common # for netty-http-common
xbib-net-url.version = 2.0.3 xbib-net-url.version = 2.0.3
@ -21,7 +22,7 @@ xbib-guice.version = 4.0.4
# for rx # for rx
reactivex.version = 1.2.10 reactivex.version = 1.2.10
# test # for test
junit.version = 5.5.2 junit.version = 5.5.2
junit4.version = 4.12 junit4.version = 4.12
conscrypt.version = 2.2.1 conscrypt.version = 2.2.1

View file

@ -0,0 +1,4 @@
dependencies {
compile project(":netty-http-common")
compile "org.bouncycastle:bcpkix-jdk15on:${project.property('bouncycastle.version')}"
}

View file

@ -0,0 +1,34 @@
package org.xbib.netty.http.bouncycastle;
import org.xbib.netty.http.common.ServerCertificateProvider;
import java.io.InputStream;
import java.security.SecureRandom;
public class BouncyCastleSelfSignedCertificateProvider implements ServerCertificateProvider {
private final SelfSignedCertificate selfSignedCertificate;
public BouncyCastleSelfSignedCertificateProvider() {
this.selfSignedCertificate = new SelfSignedCertificate();
}
@Override
public void prepare(String fqdn) throws Exception {
selfSignedCertificate.generate(fqdn, new SecureRandom(), 2048);
}
@Override
public InputStream getPrivateKey() {
return selfSignedCertificate.privateKey();
}
@Override
public InputStream getCertificateChain() {
return selfSignedCertificate.certificate();
}
@Override
public String getKeyPassword() {
return null;
}
}

View file

@ -1,4 +1,4 @@
package org.xbib.netty.http.server.security.tls; package org.xbib.netty.http.bouncycastle;
import org.bouncycastle.asn1.x500.X500Name; import org.bouncycastle.asn1.x500.X500Name;
import org.bouncycastle.asn1.x509.AlgorithmIdentifier; import org.bouncycastle.asn1.x509.AlgorithmIdentifier;
@ -58,16 +58,6 @@ public final class SelfSignedCertificate {
private PrivateKey key; private PrivateKey key;
public SelfSignedCertificate()
throws IOException, NoSuchProviderException, NoSuchAlgorithmException, OperatorCreationException {
this("localhost");
}
public SelfSignedCertificate(String fqdn)
throws IOException, NoSuchProviderException, NoSuchAlgorithmException, OperatorCreationException {
generate(fqdn, new SecureRandom(), 2048);
}
/** /**
* Creates a new instance. * Creates a new instance.
* *

View file

@ -0,0 +1 @@
org.xbib.netty.http.bouncycastle.BouncyCastleSelfSignedCertificateProvider

View file

@ -1,9 +1,9 @@
package org.xbib.netty.http.server.test; package org.xbib.netty.http.bouncycastle;
import org.bouncycastle.jce.provider.BouncyCastleProvider; import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.xbib.netty.http.server.security.tls.SelfSignedCertificate;
import java.security.SecureRandom;
import java.security.Security; import java.security.Security;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -12,7 +12,8 @@ class SelfSignedCertificateTest {
@Test @Test
void testSelfSignedCertificate() throws Exception { void testSelfSignedCertificate() throws Exception {
Security.addProvider(new BouncyCastleProvider()); Security.addProvider(new BouncyCastleProvider());
SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate("localhost"); SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate();
selfSignedCertificate.generate("localhost", new SecureRandom(), 2048);
selfSignedCertificate.exportPEM(Logger.getLogger("test")); selfSignedCertificate.exportPEM(Logger.getLogger("test"));
} }
} }

View file

@ -1,10 +1,17 @@
import org.apache.tools.ant.taskdefs.condition.Os
dependencies { dependencies {
compile project(":netty-http-client-api") compile project(":netty-http-client-api")
compile "io.netty:netty-handler-proxy:${project.property('netty.version')}" compile "io.netty:netty-handler-proxy:${project.property('netty.version')}"
compile "io.netty:netty-transport-native-epoll:${project.property('netty.version')}" runtime "org.bouncycastle:bcpkix-jdk15on:${project.property('bouncycastle.version')}"
testCompile "io.netty:netty-tcnative-boringssl-static:${project.property('tcnative.version')}" // we are an Mac OS X 10.12, but tcnative > 2.0.26 is for 10.13+
testCompile "org.bouncycastle:bcpkix-jdk15on:${project.property('bouncycastle.version')}" if (Os.isFamily(Os.FAMILY_MAC)) {
runtime "io.netty:netty-tcnative-boringssl-static:${project.property('tcnative-legacy-macosx.version')}"
runtime project(':netty-http-kqueue')
} else {
runtime "io.netty:netty-tcnative-boringssl-static:${project.property('tcnative.version')}"
runtime project(':netty-http-epoll')
}
testCompile "org.conscrypt:conscrypt-openjdk-uber:${project.property('conscrypt.version')}" testCompile "org.conscrypt:conscrypt-openjdk-uber:${project.property('conscrypt.version')}"
testCompile "com.fasterxml.jackson.core:jackson-databind:${project.property('jackson.version')}" testCompile "com.fasterxml.jackson.core:jackson-databind:${project.property('jackson.version')}"
} }

View file

@ -6,8 +6,6 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.pool.ChannelPoolHandler; import io.netty.channel.pool.ChannelPoolHandler;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
@ -32,6 +30,7 @@ import org.xbib.netty.http.client.api.Transport;
import org.xbib.netty.http.common.HttpAddress; import org.xbib.netty.http.common.HttpAddress;
import org.xbib.netty.http.common.HttpResponse; import org.xbib.netty.http.common.HttpResponse;
import org.xbib.netty.http.common.NetworkUtils; import org.xbib.netty.http.common.NetworkUtils;
import org.xbib.netty.http.common.TransportProvider;
import org.xbib.netty.http.common.security.SecurityUtil; import org.xbib.netty.http.common.security.SecurityUtil;
import javax.net.ssl.SNIHostName; import javax.net.ssl.SNIHostName;
@ -87,10 +86,6 @@ public final class Client implements AutoCloseable {
private final ByteBufAllocator byteBufAllocator; private final ByteBufAllocator byteBufAllocator;
private final EventLoopGroup eventLoopGroup;
private final Class<? extends SocketChannel> socketChannelClass;
private final Bootstrap bootstrap; private final Bootstrap bootstrap;
private final Queue<Transport> transports; private final Queue<Transport> transports;
@ -99,6 +94,10 @@ public final class Client implements AutoCloseable {
private final AtomicBoolean closed; private final AtomicBoolean closed;
private EventLoopGroup eventLoopGroup;
private Class<? extends SocketChannel> socketChannelClass;
private BoundedChannelPool<HttpAddress> pool; private BoundedChannelPool<HttpAddress> pool;
public Client() { public Client() {
@ -125,13 +124,39 @@ public final class Client implements AutoCloseable {
} }
} }
initializeTrustManagerFactory(clientConfig); initializeTrustManagerFactory(clientConfig);
this.byteBufAllocator = byteBufAllocator != null ? this.byteBufAllocator = byteBufAllocator != null ? byteBufAllocator : ByteBufAllocator.DEFAULT;
byteBufAllocator : ByteBufAllocator.DEFAULT; if (eventLoopGroup != null) {
this.eventLoopGroup = eventLoopGroup != null ? eventLoopGroup : clientConfig.isEpoll() ? this.eventLoopGroup = eventLoopGroup;
new EpollEventLoopGroup(clientConfig.getThreadCount(), new HttpClientThreadFactory()) : } else {
new NioEventLoopGroup(clientConfig.getThreadCount(), new HttpClientThreadFactory()); ServiceLoader<TransportProvider> transportProviders = ServiceLoader.load(TransportProvider.class);
this.socketChannelClass = socketChannelClass != null ? socketChannelClass : clientConfig.isEpoll() ? for (TransportProvider transportProvider : transportProviders) {
EpollSocketChannel.class : NioSocketChannel.class; if (clientConfig.getTransportProviderName() == null || clientConfig.getTransportProviderName().equals(transportProvider.getClass().getName())) {
this.eventLoopGroup = transportProvider.createEventLoopGroup(clientConfig.getThreadCount(), new HttpClientThreadFactory());
if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "transport provider event loop group: " + this.eventLoopGroup.getClass().getName());
}
}
}
}
if (this.eventLoopGroup == null) {
this.eventLoopGroup = new NioEventLoopGroup(clientConfig.getThreadCount(), new HttpClientThreadFactory());
}
if (socketChannelClass != null) {
this.socketChannelClass = socketChannelClass;
} else {
ServiceLoader<TransportProvider> transportProviders = ServiceLoader.load(TransportProvider.class);
for (TransportProvider transportProvider : transportProviders) {
if (clientConfig.getTransportProviderName() == null || clientConfig.getTransportProviderName().equals(transportProvider.getClass().getName())) {
this.socketChannelClass = transportProvider.createSocketChannelClass();
if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "transport provider channel: " + this.socketChannelClass.getName());
}
}
}
}
if (this.socketChannelClass == null) {
this.socketChannelClass = NioSocketChannel.class;
}
this.bootstrap = new Bootstrap() this.bootstrap = new Bootstrap()
.group(this.eventLoopGroup) .group(this.eventLoopGroup)
.channel(this.socketChannelClass) .channel(this.socketChannelClass)
@ -567,6 +592,11 @@ public final class Client implements AutoCloseable {
return this; return this;
} }
public Builder setTransportProviderName(String transportProviderName) {
clientConfig.setTransportProviderName(transportProviderName);
return this;
}
public Builder setEventLoop(EventLoopGroup eventLoopGroup) { public Builder setEventLoop(EventLoopGroup eventLoopGroup) {
this.eventLoopGroup = eventLoopGroup; this.eventLoopGroup = eventLoopGroup;
return this; return this;

View file

@ -1,7 +1,6 @@
package org.xbib.netty.http.client; package org.xbib.netty.http.client;
import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.epoll.Epoll;
import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http2.Http2Settings; import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LogLevel;
@ -35,9 +34,9 @@ public class ClientConfig {
LogLevel DEFAULT_DEBUG_LOG_LEVEL = LogLevel.DEBUG; LogLevel DEFAULT_DEBUG_LOG_LEVEL = LogLevel.DEBUG;
/** /**
* The default for selecting epoll. If available, select epoll. * The transport provider
*/ */
boolean EPOLL = Epoll.isAvailable(); String DEFAULT_TRANSPORT_PROVIDER = null;
/** /**
* If set to 0, then Netty will decide about thread count. * If set to 0, then Netty will decide about thread count.
@ -178,7 +177,7 @@ public class ClientConfig {
private LogLevel debugLogLevel = Defaults.DEFAULT_DEBUG_LOG_LEVEL; private LogLevel debugLogLevel = Defaults.DEFAULT_DEBUG_LOG_LEVEL;
private boolean epoll = Defaults.EPOLL; private String transportProviderName = Defaults.DEFAULT_TRANSPORT_PROVIDER;
private int threadCount = Defaults.THREAD_COUNT; private int threadCount = Defaults.THREAD_COUNT;
@ -282,18 +281,13 @@ public class ClientConfig {
return debugLogLevel; return debugLogLevel;
} }
public ClientConfig enableEpoll() { public ClientConfig setTransportProviderName(String transportProviderName) {
this.epoll = true; this.transportProviderName = transportProviderName;
return this; return this;
} }
public ClientConfig disableEpoll() { public String getTransportProviderName() {
this.epoll = false; return transportProviderName;
return this;
}
public boolean isEpoll() {
return epoll;
} }
public ClientConfig setThreadCount(int threadCount) { public ClientConfig setThreadCount(int threadCount) {

View file

@ -1,10 +1,8 @@
package org.xbib.netty.http.client.test; package org.xbib.netty.http.client.test;
import org.bouncycastle.jce.provider.BouncyCastleProvider;
import org.junit.jupiter.api.extension.BeforeAllCallback; import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext; import org.junit.jupiter.api.extension.ExtensionContext;
import java.security.Security;
import java.util.logging.ConsoleHandler; import java.util.logging.ConsoleHandler;
import java.util.logging.Handler; import java.util.logging.Handler;
import java.util.logging.Level; import java.util.logging.Level;
@ -16,9 +14,9 @@ public class NettyHttpTestExtension implements BeforeAllCallback {
@Override @Override
public void beforeAll(ExtensionContext context) { public void beforeAll(ExtensionContext context) {
if (Security.getProvider("BC") == null) { //if (Security.getProvider("BC") == null) {
Security.addProvider(new BouncyCastleProvider()); // Security.addProvider(new BouncyCastleProvider());
} //}
System.setProperty("io.netty.noUnsafe", Boolean.toString(true)); System.setProperty("io.netty.noUnsafe", Boolean.toString(true));
// System.setProperty("io.netty.leakDetection.level", "paranoid"); // System.setProperty("io.netty.leakDetection.level", "paranoid");
Level level = Level.INFO; Level level = Level.INFO;

View file

@ -1,17 +0,0 @@
package org.xbib.netty.http.client.test.pool;
import io.netty.channel.epoll.Epoll;
import org.junit.jupiter.api.extension.ConditionEvaluationResult;
import org.junit.jupiter.api.extension.ExecutionCondition;
import org.junit.jupiter.api.extension.ExtensionContext;
public class DisableTestCondition implements ExecutionCondition {
@Override
public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context) {
if (Epoll.isAvailable()) {
return ConditionEvaluationResult.enabled("Test enabled");
} else {
return ConditionEvaluationResult.disabled("Test disabled");
}
}
}

View file

@ -1,14 +0,0 @@
package org.xbib.netty.http.client.test.pool;
import org.junit.jupiter.api.extension.ExtendWith;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@ExtendWith(DisableTestCondition.class)
public @interface DisabledIfEpolllNotAvailable {
}

View file

@ -1,184 +0,0 @@
package org.xbib.netty.http.client.test.pool;
import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpVersion;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.xbib.netty.http.common.HttpAddress;
import org.xbib.netty.http.client.api.Pool;
import org.xbib.netty.http.client.pool.BoundedChannelPool;
import java.io.Closeable;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.LongAdder;
import java.util.logging.Level;
import java.util.logging.Logger;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@DisabledIfEpolllNotAvailable
class EpollTest {
private static final Logger logger = Logger.getLogger(EpollTest.class.getName());
private static final int CONCURRENCY = 4;
private static final List<HttpAddress> NODES =
Collections.singletonList(HttpAddress.http1("localhost", 12345));
private static final long TEST_TIME_SECONDS = 100;
private static final int ATTEMPTS = 1_000;
private static final int FAIL_EVERY_ATTEMPT = 10;
private static final ByteBuf PAYLOAD = Unpooled.directBuffer(0x1000).writeZero(0x1000);
private MockEpollServer mockEpollServer;
private Pool<Channel> channelPool;
private EventLoopGroup eventLoopGroup;
@BeforeAll
void setUp() throws Exception {
mockEpollServer = new MockEpollServer(12345, FAIL_EVERY_ATTEMPT);
Semaphore semaphore = new Semaphore(CONCURRENCY);
eventLoopGroup = new EpollEventLoopGroup();
Bootstrap bootstrap = new Bootstrap()
.group(eventLoopGroup)
.channel(EpollSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
socketChannel.pipeline().addLast(new DummyClientChannelHandler());
}
})
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.SO_REUSEADDR, true)
.option(ChannelOption.TCP_NODELAY, true);
channelPool = new BoundedChannelPool<>(semaphore, HttpVersion.HTTP_1_1,
NODES, bootstrap, null, 0, BoundedChannelPool.PoolKeySelectorType.ROUNDROBIN);
channelPool.prepare(CONCURRENCY);
}
@AfterAll
void tearDown() throws Exception {
channelPool.close();
eventLoopGroup.shutdownGracefully();
mockEpollServer.close();
}
@Test
void testPoolEpoll() throws Exception {
LongAdder longAdder = new LongAdder();
ExecutorService executor = Executors.newFixedThreadPool(CONCURRENCY);
for(int i = 0; i < CONCURRENCY; i ++) {
executor.submit(() -> {
Channel channel;
for(int j = 0; j < ATTEMPTS; j ++) {
try {
while ((channel = channelPool.acquire()) == null) {
Thread.sleep(1); // very short?
}
channel.writeAndFlush(PAYLOAD.retain()).sync();
channelPool.release(channel, false);
longAdder.increment();
} catch (InterruptedException e) {
break;
} catch (Throwable cause) {
logger.log(Level.WARNING, cause.getMessage(), cause);
}
}
});
}
executor.shutdown();
executor.awaitTermination(TEST_TIME_SECONDS, TimeUnit.SECONDS);
assertTrue(executor.isTerminated());
assertEquals(CONCURRENCY * ATTEMPTS, longAdder.sum(),
2 * CONCURRENCY * ATTEMPTS / FAIL_EVERY_ATTEMPT);
}
class DummyClientChannelHandler extends SimpleChannelInboundHandler<Object> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
logger.log(Level.WARNING, cause.getMessage(), cause);
}
}
class MockEpollServer implements Closeable {
private final EventLoopGroup dispatchGroup;
private final EventLoopGroup workerGroup;
private final ChannelFuture bindFuture;
private final AtomicLong reqCounter;
MockEpollServer(int port, int dropEveryRequest) throws InterruptedException {
dispatchGroup = new EpollEventLoopGroup();
workerGroup = new EpollEventLoopGroup();
reqCounter = new AtomicLong(0);
ServerBootstrap bootstrap = new ServerBootstrap()
.group(dispatchGroup, workerGroup)
.channel(EpollServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
if (dropEveryRequest > 0) {
ch.pipeline().addLast(new SimpleChannelInboundHandler<Object>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) {
if (reqCounter.incrementAndGet() % dropEveryRequest == 0) {
Channel channel = ctx.channel();
logger.log(Level.INFO,"dropping the connection " + channel);
channel.close();
}
}
});
}
}
});
bindFuture = bootstrap.bind(port).sync();
}
@Override
public void close() {
bindFuture.channel().close();
workerGroup.shutdownGracefully();
dispatchGroup.shutdownGracefully();
}
}
}

View file

@ -0,0 +1,32 @@
package org.xbib.netty.http.common;
import java.io.InputStream;
public interface ServerCertificateProvider {
/**
* Prepare the server certificate, if appropriate.
*
* @param fqdn the full qualified domain name.
*/
void prepare(String fqdn) throws Exception;
/**
* Returns the generated RSA private key file in PEM format.
* @return input stream of private key
*/
InputStream getPrivateKey();
/**
* Returns the generated X.509 certificate file in PEM format.
* @return input stream of certificate
*/
InputStream getCertificateChain();
/**
* A key password or null if key password is not required.
* @return key password
*/
String getKeyPassword();
}

View file

@ -0,0 +1,16 @@
package org.xbib.netty.http.common;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import java.util.concurrent.ThreadFactory;
public interface TransportProvider {
EventLoopGroup createEventLoopGroup(int nThreads, ThreadFactory threadFactory);
Class<? extends SocketChannel> createSocketChannelClass();
Class<? extends ServerSocketChannel> createServerSocketChannelClass();
}

View file

@ -0,0 +1,4 @@
dependencies {
compile project(":netty-http-common")
compile "io.netty:netty-transport-native-epoll:${project.property('netty.version')}"
}

View file

@ -0,0 +1,29 @@
package org.xbib.netty.http.epoll;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import org.xbib.netty.http.common.TransportProvider;
import java.util.concurrent.ThreadFactory;
public class EpollTransportProvider implements TransportProvider {
@Override
public EventLoopGroup createEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
return Epoll.isAvailable() ? new EpollEventLoopGroup(nThreads, threadFactory) : null;
}
@Override
public Class<? extends SocketChannel> createSocketChannelClass() {
return Epoll.isAvailable() ? EpollSocketChannel.class : null;
}
@Override
public Class<? extends ServerSocketChannel> createServerSocketChannelClass() {
return Epoll.isAvailable() ? EpollServerSocketChannel.class : null;
}
}

View file

@ -0,0 +1 @@
org.xbib.netty.http.epoll.EpollTransportProvider

View file

@ -0,0 +1,4 @@
dependencies {
compile project(":netty-http-common")
compile "io.netty:netty-transport-native-kqueue:${project.property('netty.version')}"
}

View file

@ -0,0 +1,29 @@
package org.xbib.netty.http.kqueue;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.kqueue.KQueue;
import io.netty.channel.kqueue.KQueueEventLoopGroup;
import io.netty.channel.kqueue.KQueueServerSocketChannel;
import io.netty.channel.kqueue.KQueueSocketChannel;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import org.xbib.netty.http.common.TransportProvider;
import java.util.concurrent.ThreadFactory;
public class KqueueTransportProvider implements TransportProvider {
@Override
public EventLoopGroup createEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
return KQueue.isAvailable() ? new KQueueEventLoopGroup(nThreads, threadFactory) : null;
}
@Override
public Class<? extends SocketChannel> createSocketChannelClass() {
return KQueue.isAvailable() ? KQueueSocketChannel.class : null;
}
@Override
public Class<? extends ServerSocketChannel> createServerSocketChannelClass() {
return KQueue.isAvailable() ? KQueueServerSocketChannel.class : null;
}
}

View file

@ -0,0 +1 @@
org.xbib.netty.http.kqueue.KqueueTransportProvider

View file

@ -1,8 +1,16 @@
import org.apache.tools.ant.taskdefs.condition.Os
dependencies { dependencies {
compile project(":netty-http-common") compile project(":netty-http-common")
compile project(":netty-http-server-api") compile project(":netty-http-server-api")
compile "io.netty:netty-transport-native-epoll:${project.property('netty.version')}" // we are an Mac OS X 10.12, but tcnative > 2.0.26 is for 10.13+
compile "io.netty:netty-tcnative-boringssl-static:${project.property('tcnative.version')}" if (Os.isFamily(Os.FAMILY_MAC)) {
compile "org.bouncycastle:bcpkix-jdk15on:${project.property('bouncycastle.version')}" runtime "io.netty:netty-tcnative-boringssl-static:${project.property('tcnative-legacy-macosx.version')}"
testCompile project(":netty-http-client") runtime project(':netty-http-kqueue')
} else {
runtime "io.netty:netty-tcnative-boringssl-static:${project.property('tcnative.version')}"
runtime project(':netty-http-epoll')
}
testCompile project(":netty-http-client")
testRuntime project(":netty-http-bouncycastle")
} }

View file

@ -8,13 +8,13 @@ import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslProvider; import io.netty.handler.ssl.SslProvider;
import org.xbib.netty.http.common.HttpAddress; import org.xbib.netty.http.common.HttpAddress;
import org.xbib.netty.http.common.ServerCertificateProvider;
import org.xbib.netty.http.common.security.SecurityUtil; import org.xbib.netty.http.common.security.SecurityUtil;
import org.xbib.netty.http.server.api.ServerRequest; import org.xbib.netty.http.server.api.ServerRequest;
import org.xbib.netty.http.server.api.ServerResponse; import org.xbib.netty.http.server.api.ServerResponse;
import org.xbib.netty.http.server.endpoint.HttpEndpoint; import org.xbib.netty.http.server.endpoint.HttpEndpoint;
import org.xbib.netty.http.server.endpoint.HttpEndpointResolver; import org.xbib.netty.http.server.endpoint.HttpEndpointResolver;
import org.xbib.netty.http.server.api.Filter; import org.xbib.netty.http.server.api.Filter;
import org.xbib.netty.http.server.security.tls.SelfSignedCertificate;
import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.TrustManagerFactory;
import java.io.IOException; import java.io.IOException;
@ -27,13 +27,18 @@ import java.util.Collections;
import java.util.LinkedHashSet; import java.util.LinkedHashSet;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.ServiceLoader;
import java.util.Set; import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
/** /**
* The {@code Domain} class represents a virtual server with a name, with or without SSL. * The {@code Domain} class represents a virtual server with a name, with or without SSL.
*/ */
public class Domain { public class Domain {
private static final Logger logger = Logger.getLogger(Domain.class.getName());
private final String name; private final String name;
private final Set<String> aliases; private final Set<String> aliases;
@ -259,10 +264,19 @@ public class Domain {
} }
public Builder setSelfCert() throws Exception { public Builder setSelfCert() throws Exception {
SelfSignedCertificate selfSignedCertificate = new SelfSignedCertificate(serverName); ServiceLoader<ServerCertificateProvider> serverCertificateProviders = ServiceLoader.load(ServerCertificateProvider.class);
setKeyCertChainInputStream(selfSignedCertificate.certificate()); for (ServerCertificateProvider serverCertificateProvider : serverCertificateProviders) {
setKeyInputStream(selfSignedCertificate.privateKey()); if ("org.xbib.netty.http.bouncycastle.BouncyCastleSelfSignedCertificateProvider".equals(serverCertificateProvider.getClass().getName())) {
setKeyPassword(null); serverCertificateProvider.prepare(serverName);
setKeyCertChainInputStream(serverCertificateProvider.getCertificateChain());
setKeyInputStream(serverCertificateProvider.getPrivateKey());
setKeyPassword(serverCertificateProvider.getKeyPassword());
logger.log(Level.INFO, "self signed certificate installed");
}
}
if (keyCertChainInputStream == null) {
logger.log(Level.WARNING, "unable to install self signed certificate. Is netty-http-bouncycastle present?");
}
return this; return this;
} }

View file

@ -6,9 +6,6 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollServerSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
@ -20,6 +17,7 @@ import io.netty.util.DomainNameMapping;
import io.netty.util.DomainNameMappingBuilder; import io.netty.util.DomainNameMappingBuilder;
import org.xbib.netty.http.common.HttpAddress; import org.xbib.netty.http.common.HttpAddress;
import org.xbib.netty.http.common.NetworkUtils; import org.xbib.netty.http.common.NetworkUtils;
import org.xbib.netty.http.common.TransportProvider;
import org.xbib.netty.http.server.api.HttpChannelInitializer; import org.xbib.netty.http.server.api.HttpChannelInitializer;
import org.xbib.netty.http.server.api.ProtocolProvider; import org.xbib.netty.http.server.api.ProtocolProvider;
import org.xbib.netty.http.server.api.ServerResponse; import org.xbib.netty.http.server.api.ServerResponse;
@ -328,9 +326,15 @@ public final class Server implements AutoCloseable {
EventLoopGroup parentEventLoopGroup ) { EventLoopGroup parentEventLoopGroup ) {
EventLoopGroup eventLoopGroup = parentEventLoopGroup; EventLoopGroup eventLoopGroup = parentEventLoopGroup;
if (eventLoopGroup == null) { if (eventLoopGroup == null) {
eventLoopGroup = serverConfig.isEpoll() ? ServiceLoader<TransportProvider> transportProviders = ServiceLoader.load(TransportProvider.class);
new EpollEventLoopGroup(serverConfig.getParentThreadCount(), new HttpServerParentThreadFactory()) : for (TransportProvider transportProvider : transportProviders) {
new NioEventLoopGroup(serverConfig.getParentThreadCount(), new HttpServerParentThreadFactory()); if (serverConfig.getTransportProviderName() == null || serverConfig.getTransportProviderName().equals(transportProvider.getClass().getName())) {
eventLoopGroup = transportProvider.createEventLoopGroup(serverConfig.getParentThreadCount(), new HttpServerParentThreadFactory());
}
}
}
if (eventLoopGroup == null) {
eventLoopGroup = new NioEventLoopGroup(serverConfig.getParentThreadCount(), new HttpServerParentThreadFactory());
} }
return eventLoopGroup; return eventLoopGroup;
} }
@ -339,9 +343,15 @@ public final class Server implements AutoCloseable {
EventLoopGroup childEventLoopGroup ) { EventLoopGroup childEventLoopGroup ) {
EventLoopGroup eventLoopGroup = childEventLoopGroup; EventLoopGroup eventLoopGroup = childEventLoopGroup;
if (eventLoopGroup == null) { if (eventLoopGroup == null) {
eventLoopGroup = serverConfig.isEpoll() ? ServiceLoader<TransportProvider> transportProviders = ServiceLoader.load(TransportProvider.class);
new EpollEventLoopGroup(serverConfig.getChildThreadCount(), new HttpServerChildThreadFactory()) : for (TransportProvider transportProvider : transportProviders) {
new NioEventLoopGroup(serverConfig.getChildThreadCount(), new HttpServerChildThreadFactory()); if (serverConfig.getTransportProviderName() == null || serverConfig.getTransportProviderName().equals(transportProvider.getClass().getName())) {
eventLoopGroup = transportProvider.createEventLoopGroup(serverConfig.getChildThreadCount(), new HttpServerChildThreadFactory());
}
}
}
if (eventLoopGroup == null) {
eventLoopGroup = new NioEventLoopGroup(serverConfig.getChildThreadCount(), new HttpServerChildThreadFactory());
} }
return eventLoopGroup; return eventLoopGroup;
} }
@ -350,12 +360,16 @@ public final class Server implements AutoCloseable {
Class<? extends ServerSocketChannel> socketChannelClass) { Class<? extends ServerSocketChannel> socketChannelClass) {
Class<? extends ServerSocketChannel> channelClass = socketChannelClass; Class<? extends ServerSocketChannel> channelClass = socketChannelClass;
if (channelClass == null) { if (channelClass == null) {
if (serverConfig.isEpoll() && Epoll.isAvailable()) { ServiceLoader<TransportProvider> transportProviders = ServiceLoader.load(TransportProvider.class);
channelClass = EpollServerSocketChannel.class; for (TransportProvider transportProvider : transportProviders) {
} else { if (serverConfig.getTransportProviderName() == null || serverConfig.getTransportProviderName().equals(transportProvider.getClass().getName())) {
channelClass = NioServerSocketChannel.class; channelClass = transportProvider.createServerSocketChannelClass();
} }
} }
}
if (channelClass == null) {
channelClass = NioServerSocketChannel.class;
}
return channelClass; return channelClass;
} }
@ -470,6 +484,11 @@ public final class Server implements AutoCloseable {
return this; return this;
} }
public Builder setTransportProviderName(String transportProviderName) {
this.serverConfig.setTransportProviderName(transportProviderName);
return this;
}
public Builder setParentEventLoopGroup(EventLoopGroup parentEventLoopGroup) { public Builder setParentEventLoopGroup(EventLoopGroup parentEventLoopGroup) {
this.parentEventLoopGroup = parentEventLoopGroup; this.parentEventLoopGroup = parentEventLoopGroup;
return this; return this;
@ -485,11 +504,6 @@ public final class Server implements AutoCloseable {
return this; return this;
} }
public Builder setUseEpoll(boolean useEpoll) {
this.serverConfig.setEpoll(useEpoll);
return this;
}
public Builder setConnectTimeoutMillis(int connectTimeoutMillis) { public Builder setConnectTimeoutMillis(int connectTimeoutMillis) {
this.serverConfig.setConnectTimeoutMillis(connectTimeoutMillis); this.serverConfig.setConnectTimeoutMillis(connectTimeoutMillis);
return this; return this;

View file

@ -1,7 +1,6 @@
package org.xbib.netty.http.server; package org.xbib.netty.http.server;
import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.epoll.Epoll;
import io.netty.handler.codec.http2.Http2Settings; import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LogLevel;
import io.netty.handler.ssl.CipherSuiteFilter; import io.netty.handler.ssl.CipherSuiteFilter;
@ -35,10 +34,7 @@ public class ServerConfig {
*/ */
LogLevel DEBUG_LOG_LEVEL = LogLevel.DEBUG; LogLevel DEBUG_LOG_LEVEL = LogLevel.DEBUG;
/** String TRANSPORT_PROVIDER_NAME = null;
* The default for selecting epoll. If available, select epoll.
*/
boolean EPOLL = Epoll.isAvailable();
/** /**
* Let Netty decide about parent thread count. * Let Netty decide about parent thread count.
@ -200,7 +196,7 @@ public class ServerConfig {
private LogLevel debugLogLevel = Defaults.DEBUG_LOG_LEVEL; private LogLevel debugLogLevel = Defaults.DEBUG_LOG_LEVEL;
private boolean epoll = Defaults.EPOLL; private String transportProviderName = Defaults.TRANSPORT_PROVIDER_NAME;
private int parentThreadCount = Defaults.PARENT_THREAD_COUNT; private int parentThreadCount = Defaults.PARENT_THREAD_COUNT;
@ -293,23 +289,13 @@ public class ServerConfig {
return debugLogLevel; return debugLogLevel;
} }
public ServerConfig enableEpoll() { public ServerConfig setTransportProviderName(String transportProviderName) {
this.epoll = true; this.transportProviderName = transportProviderName;
return this; return this;
} }
public ServerConfig disableEpoll() { public String getTransportProviderName() {
this.epoll = false; return transportProviderName;
return this;
}
public ServerConfig setEpoll(boolean epoll) {
this.epoll = epoll;
return this;
}
public boolean isEpoll() {
return epoll;
} }
public ServerConfig setParentThreadCount(int parentThreadCount) { public ServerConfig setParentThreadCount(int parentThreadCount) {

View file

@ -128,7 +128,7 @@ class CleartextTest {
@Test @Test
void testMultithreadPooledClearTextHttp2() throws Exception { void testMultithreadPooledClearTextHttp2() throws Exception {
int threads = 2; int threads = 2;
int loop = 1000; int loop = 1024;
HttpAddress httpAddress = HttpAddress.http2("localhost", 8008); HttpAddress httpAddress = HttpAddress.http2("localhost", 8008);
Domain domain = Domain.builder(httpAddress) Domain domain = Domain.builder(httpAddress)
.singleEndpoint("/", (request, response) -> .singleEndpoint("/", (request, response) ->
@ -179,13 +179,13 @@ class CleartextTest {
}); });
} }
executorService.shutdown(); executorService.shutdown();
boolean terminated = executorService.awaitTermination(10L, TimeUnit.SECONDS); boolean terminated = executorService.awaitTermination(20L, TimeUnit.SECONDS);
executorService.shutdownNow(); executorService.shutdownNow();
logger.log(Level.INFO, "terminated = " + terminated + ", now waiting for transport to complete"); logger.log(Level.INFO, "terminated = " + terminated + ", now waiting for transport to complete");
transport.get(10L, TimeUnit.SECONDS); transport.get(20L, TimeUnit.SECONDS);
} finally { } finally {
server.shutdownGracefully(10L, TimeUnit.SECONDS); server.shutdownGracefully(20L, TimeUnit.SECONDS);
client.shutdownGracefully(10L, TimeUnit.SECONDS); client.shutdownGracefully(20L, TimeUnit.SECONDS);
} }
logger.log(Level.INFO, "server requests = " + server.getRequestCounter() + logger.log(Level.INFO, "server requests = " + server.getRequestCounter() +
" server responses = " + server.getResponseCounter()); " server responses = " + server.getResponseCounter());

View file

@ -71,7 +71,7 @@ class EncryptedTest {
@Test @Test
void testPooledSecureHttp2() throws Exception { void testPooledSecureHttp2() throws Exception {
int loop = 4096; int loop = 1024;
HttpAddress httpAddress = HttpAddress.secureHttp2("localhost", 8143); HttpAddress httpAddress = HttpAddress.secureHttp2("localhost", 8143);
Server server = Server.builder(Domain.builder(httpAddress) Server server = Server.builder(Domain.builder(httpAddress)
.setSelfCert() .setSelfCert()
@ -116,8 +116,8 @@ class EncryptedTest {
@Test @Test
void testMultithreadPooledSecureHttp2() throws Exception { void testMultithreadPooledSecureHttp2() throws Exception {
int threads = 4; int threads = 2;
int loop = 4 * 1024; int loop = 1024;
HttpAddress httpAddress = HttpAddress.secureHttp2("localhost", 8143); HttpAddress httpAddress = HttpAddress.secureHttp2("localhost", 8143);
Server server = Server.builder(Domain.builder(httpAddress) Server server = Server.builder(Domain.builder(httpAddress)
.setSelfCert() .setSelfCert()
@ -164,12 +164,13 @@ class EncryptedTest {
}); });
} }
executorService.shutdown(); executorService.shutdown();
boolean terminated = executorService.awaitTermination(60, TimeUnit.SECONDS); boolean terminated = executorService.awaitTermination(20, TimeUnit.SECONDS);
executorService.shutdownNow();
logger.log(Level.INFO, "terminated = " + terminated + ", now waiting for transport to complete"); logger.log(Level.INFO, "terminated = " + terminated + ", now waiting for transport to complete");
transport.get(60, TimeUnit.SECONDS); transport.get(20, TimeUnit.SECONDS);
} finally { } finally {
client.shutdownGracefully(); client.shutdownGracefully(20, TimeUnit.SECONDS);
server.shutdownGracefully(); server.shutdownGracefully(20, TimeUnit.SECONDS);
} }
assertEquals(threads * loop , counter.get()); assertEquals(threads * loop , counter.get());
} }

View file

@ -1,4 +1,7 @@
include 'netty-http-common' include 'netty-http-common'
include 'netty-http-epoll'
include 'netty-http-kqueue'
include 'netty-http-bouncycastle'
include 'netty-http-client-api' include 'netty-http-client-api'
include 'netty-http-client' include 'netty-http-client'
include 'netty-http-client-rest' include 'netty-http-client-rest'