do not use ServiceLoader instances across threads

This commit is contained in:
Jörg Prante 2024-03-15 13:47:36 +01:00
parent d624688f08
commit a270ea2854
6 changed files with 116 additions and 117 deletions

View file

@ -1,3 +1,3 @@
group = org.xbib group = org.xbib
name = net-http name = net-http
version = 4.3.0 version = 4.4.0

View file

@ -53,8 +53,7 @@ class Https1Test {
@Test @Test
void testGoogleHttp() throws Exception { void testGoogleHttp() throws Exception {
NettyHttpClientConfig config = new NettyHttpsClientConfig() NettyHttpClientConfig config = new NettyHttpsClientConfig();
.setProtocolNegotiation(true);
try (NettyHttpClient client = NettyHttpClient.builder() try (NettyHttpClient client = NettyHttpClient.builder()
.setConfig(config) .setConfig(config)
.build()) { .build()) {

View file

@ -37,9 +37,7 @@ public class NettyHttpClient implements HttpClient<HttpRequest, HttpResponse>, C
private final AtomicBoolean closed; private final AtomicBoolean closed;
private final HttpChannelInitializer httpChannelInitializer; private HttpChannelInitializer httpChannelInitializer;
private final ServiceLoader<HttpChannelInitializer> httpChannelInitializerServiceLoader;
private Pool pool; private Pool pool;
@ -53,7 +51,6 @@ public class NettyHttpClient implements HttpClient<HttpRequest, HttpResponse>, C
this.bootstrap = bootstrap; this.bootstrap = bootstrap;
this.closed = new AtomicBoolean(false); this.closed = new AtomicBoolean(false);
this.httpChannelInitializer = builder.httpChannelInitializer; this.httpChannelInitializer = builder.httpChannelInitializer;
this.httpChannelInitializerServiceLoader = ServiceLoader.load(HttpChannelInitializer.class);
createBoundedPool(builder.nettyHttpClientConfig, bootstrap); createBoundedPool(builder.nettyHttpClientConfig, bootstrap);
this.interactions = new CopyOnWriteArrayList<>(); this.interactions = new CopyOnWriteArrayList<>();
} }
@ -209,13 +206,21 @@ public class NettyHttpClient implements HttpClient<HttpRequest, HttpResponse>, C
} }
/**
* The lookup here needs to be thread-safe.
* @param httpAddress the HTTP address for the channel initializer to look up.
* @return the channel initializer
*/
private HttpChannelInitializer lookupChannelInitializer(HttpAddress httpAddress) { private HttpChannelInitializer lookupChannelInitializer(HttpAddress httpAddress) {
if (httpChannelInitializer != null || httpAddress == null) { if (httpChannelInitializer != null || httpAddress == null) {
return httpChannelInitializer; return httpChannelInitializer;
} }
for (HttpChannelInitializer initializer : httpChannelInitializerServiceLoader) { synchronized (this) {
if (initializer.supports(httpAddress)) { for (HttpChannelInitializer initializer : ServiceLoader.load(HttpChannelInitializer.class)) {
return initializer; if (initializer.supports(httpAddress)) {
httpChannelInitializer = initializer;
return initializer;
}
} }
} }
throw new IllegalStateException("no channel initializer found for address " + httpAddress + ", check service provider"); throw new IllegalStateException("no channel initializer found for address " + httpAddress + ", check service provider");

View file

@ -24,6 +24,8 @@ public class NettyHttpClientBuilder {
private static final Logger logger = Logger.getLogger(NettyHttpClientBuilder.class.getName()); private static final Logger logger = Logger.getLogger(NettyHttpClientBuilder.class.getName());
private static final Object lock = new Object();
NettyHttpClientConfig nettyHttpClientConfig; NettyHttpClientConfig nettyHttpClientConfig;
ByteBufAllocator byteBufAllocator; ByteBufAllocator byteBufAllocator;
@ -93,68 +95,65 @@ public class NettyHttpClientBuilder {
if (byteBufAllocator == null) { if (byteBufAllocator == null) {
byteBufAllocator = ByteBufAllocator.DEFAULT; byteBufAllocator = ByteBufAllocator.DEFAULT;
} }
EventLoopGroup myEventLoopGroup = createEventLoopGroup(nettyHttpClientConfig, eventLoopGroup); createEventLoopGroup(nettyHttpClientConfig);
Class<? extends SocketChannel> mySocketChannelClass = createChannelClass(nettyHttpClientConfig, socketChannelClass); createChannelClass(nettyHttpClientConfig);
Bootstrap bootstrap = createBootstrap(nettyHttpClientConfig, byteBufAllocator, myEventLoopGroup, mySocketChannelClass); Bootstrap bootstrap = createBootstrap(nettyHttpClientConfig, byteBufAllocator, eventLoopGroup, socketChannelClass);
if (nettyCustomizer != null) { if (nettyCustomizer != null) {
nettyCustomizer.afterBootstrapInitialized(bootstrap); nettyCustomizer.afterBootstrapInitialized(bootstrap);
} }
return new NettyHttpClient(this, myEventLoopGroup, bootstrap); return new NettyHttpClient(this, eventLoopGroup, bootstrap);
} }
protected NettyHttpClientConfig createEmptyConfig() { protected NettyHttpClientConfig createEmptyConfig() {
return new NettyHttpClientConfig(); return new NettyHttpClientConfig();
} }
private static EventLoopGroup createEventLoopGroup(NettyHttpClientConfig clientConfig, private void createEventLoopGroup(NettyHttpClientConfig clientConfig) {
EventLoopGroup eventLoopGroup) {
if (eventLoopGroup != null) { if (eventLoopGroup != null) {
return eventLoopGroup; return;
} }
EventLoopGroup myEventLoopGroup = null; synchronized (lock) {
ThreadFactory threadFactory = new NamedThreadFactory("org-xbib-net-http-netty-client"); ThreadFactory threadFactory = new NamedThreadFactory("org-xbib-net-http-netty-client");
ServiceLoader<ClientTransportProvider> transportProviders = ServiceLoader.load(ClientTransportProvider.class); for (ClientTransportProvider serverTransportProvider : ServiceLoader.load(ClientTransportProvider.class)) {
for (ClientTransportProvider serverTransportProvider : transportProviders) { if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "found event loop group provider = " + serverTransportProvider);
}
if (clientConfig.getTransportProviderName() == null || clientConfig.getTransportProviderName().equals(serverTransportProvider.getClass().getName())) {
eventLoopGroup = serverTransportProvider.createEventLoopGroup(clientConfig.getThreadCount(), threadFactory);
break;
}
}
if (eventLoopGroup == null) {
eventLoopGroup = new NioEventLoopGroup(clientConfig.getThreadCount(), threadFactory);
}
if (logger.isLoggable(Level.FINEST)) { if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "found event loop group provider = " + serverTransportProvider); logger.log(Level.FINEST, "event loop group class: " + eventLoopGroup.getClass().getName());
}
if (clientConfig.getTransportProviderName() == null || clientConfig.getTransportProviderName().equals(serverTransportProvider.getClass().getName())) {
myEventLoopGroup = serverTransportProvider.createEventLoopGroup(clientConfig.getThreadCount(), threadFactory);
break;
} }
} }
if (myEventLoopGroup == null) {
myEventLoopGroup = new NioEventLoopGroup(clientConfig.getThreadCount(), threadFactory);
}
if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "event loop group class: " + myEventLoopGroup.getClass().getName());
}
return myEventLoopGroup;
} }
private static Class<? extends SocketChannel> createChannelClass(NettyHttpClientConfig clientConfig, private void createChannelClass(NettyHttpClientConfig clientConfig) {
Class<? extends SocketChannel> socketChannelClass) {
if (socketChannelClass != null) { if (socketChannelClass != null) {
return socketChannelClass; return;
} }
Class<? extends SocketChannel> myChannelClass = null; synchronized (lock) {
ServiceLoader<ClientTransportProvider> transportProviders = ServiceLoader.load(ClientTransportProvider.class); ServiceLoader<ClientTransportProvider> transportProviders = ServiceLoader.load(ClientTransportProvider.class);
for (ClientTransportProvider transportProvider : transportProviders) { for (ClientTransportProvider transportProvider : transportProviders) {
if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "found socket channel provider = " + transportProvider);
}
if (clientConfig.getTransportProviderName() == null || clientConfig.getTransportProviderName().equals(transportProvider.getClass().getName())) {
socketChannelClass = transportProvider.createSocketChannelClass();
break;
}
}
if (socketChannelClass == null) {
socketChannelClass = NioSocketChannel.class;
}
if (logger.isLoggable(Level.FINEST)) { if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "found socket channel provider = " + transportProvider); logger.log(Level.FINEST, "socket channel class: " + socketChannelClass.getName());
}
if (clientConfig.getTransportProviderName() == null || clientConfig.getTransportProviderName().equals(transportProvider.getClass().getName())) {
myChannelClass = transportProvider.createSocketChannelClass();
break;
} }
} }
if (myChannelClass == null) {
myChannelClass = NioSocketChannel.class;
}
if (logger.isLoggable(Level.FINEST)) {
logger.log(Level.FINEST, "socket channel class: " + myChannelClass.getName());
}
return myChannelClass;
} }
private static Bootstrap createBootstrap(NettyHttpClientConfig nettyHttpClientConfig, private static Bootstrap createBootstrap(NettyHttpClientConfig nettyHttpClientConfig,

View file

@ -49,24 +49,18 @@ public class NettyHttpServer implements HttpServer {
private final Class<? extends ServerSocketChannel> socketChannelClass; private final Class<? extends ServerSocketChannel> socketChannelClass;
private final HttpChannelInitializer httpChannelInitializer;
private final ServiceLoader<HttpChannelInitializer> serviceLoader;
private final Collection<ChannelFuture> channelFutures; private final Collection<ChannelFuture> channelFutures;
private final Collection<Channel> channels; private final Collection<Channel> channels;
NettyHttpServer(NettyHttpServerBuilder builder, private HttpChannelInitializer httpChannelInitializer;
EventLoopGroup parentEventLoopGroup,
EventLoopGroup childEventLoopGroup, NettyHttpServer(NettyHttpServerBuilder builder) {
Class<? extends ServerSocketChannel> socketChannelClass) {
this.builder = builder; this.builder = builder;
this.parentEventLoopGroup = parentEventLoopGroup; this.parentEventLoopGroup = builder.parentEventLoopGroup;
this.childEventLoopGroup = childEventLoopGroup; this.childEventLoopGroup = builder.childEventLoopGroup;
this.socketChannelClass = socketChannelClass; this.socketChannelClass = builder.socketChannelClass;
this.httpChannelInitializer = builder.httpChannelInitializer; this.httpChannelInitializer = builder.httpChannelInitializer;
this.serviceLoader = ServiceLoader.load(HttpChannelInitializer.class);
this.channelFutures = new ArrayList<>(); this.channelFutures = new ArrayList<>();
this.channels = new ArrayList<>(); this.channels = new ArrayList<>();
logger.log(Level.FINE, "parent event loop group = " + parentEventLoopGroup + logger.log(Level.FINE, "parent event loop group = " + parentEventLoopGroup +
@ -129,7 +123,8 @@ public class NettyHttpServer implements HttpServer {
} }
}); });
channel.attr(NettyHttpServerConfig.ATTRIBUTE_HTTP_ADDRESS).set(httpAddress); channel.attr(NettyHttpServerConfig.ATTRIBUTE_HTTP_ADDRESS).set(httpAddress);
createChannelInitializer(httpAddress).init(channel, getServer(), builder.nettyCustomizer); createChannelInitializer(httpAddress);
httpChannelInitializer.init(channel, getServer(), builder.nettyCustomizer);
} }
}); });
if (getNettyHttpServerConfig().isDebug()) { if (getNettyHttpServerConfig().isDebug()) {
@ -248,15 +243,18 @@ public class NettyHttpServer implements HttpServer {
logger.log(Level.INFO, "server shutdown complete"); logger.log(Level.INFO, "server shutdown complete");
} }
private HttpChannelInitializer createChannelInitializer(HttpAddress address) { private void createChannelInitializer(HttpAddress address) {
if (httpChannelInitializer != null && httpChannelInitializer.supports(address)) { if (httpChannelInitializer != null && httpChannelInitializer.supports(address)) {
return httpChannelInitializer; return;
} }
for (HttpChannelInitializer httpChannelInitializer : serviceLoader) { synchronized (this) {
if (httpChannelInitializer.supports(address)) { for (HttpChannelInitializer httpChannelInitializer : ServiceLoader.load(HttpChannelInitializer.class)) {
return httpChannelInitializer; if (httpChannelInitializer.supports(address)) {
this.httpChannelInitializer = httpChannelInitializer;
return;
}
} }
throw new IllegalStateException("no channel initializer found for address " + address);
} }
throw new IllegalStateException("no channel initializer found for address " + address);
} }
} }

View file

@ -14,6 +14,8 @@ import java.util.concurrent.ThreadFactory;
public class NettyHttpServerBuilder implements HttpServerBuilder { public class NettyHttpServerBuilder implements HttpServerBuilder {
private static final Object lock = new Object();
NettyHttpServerConfig nettyHttpServerConfig; NettyHttpServerConfig nettyHttpServerConfig;
Application application; Application application;
@ -83,70 +85,66 @@ public class NettyHttpServerBuilder implements HttpServerBuilder {
} }
public NettyHttpServer build() { public NettyHttpServer build() {
return new NettyHttpServer(this, createParentEventLoopGroup(nettyHttpServerConfig);
createParentEventLoopGroup(nettyHttpServerConfig, parentEventLoopGroup), createChildEventLoopGroup(nettyHttpServerConfig);
createChildEventLoopGroup(nettyHttpServerConfig, childEventLoopGroup), createSocketChannelClass(nettyHttpServerConfig);
createSocketChannelClass(nettyHttpServerConfig, socketChannelClass)); return new NettyHttpServer(this);
} }
private static EventLoopGroup createParentEventLoopGroup(NettyHttpServerConfig httpServerConfig, private void createParentEventLoopGroup(NettyHttpServerConfig httpServerConfig) {
EventLoopGroup parentEventLoopGroup) {
if (parentEventLoopGroup != null) { if (parentEventLoopGroup != null) {
return parentEventLoopGroup; return;
} }
EventLoopGroup eventLoopGroup = null; synchronized (lock) {
ThreadFactory threadFactory = new NamedThreadFactory("org-xbib-net-http-netty-server-parent"); ThreadFactory threadFactory = new NamedThreadFactory("org-xbib-net-http-netty-server-parent");
ServiceLoader<ServerTransportProvider> transportProviders = ServiceLoader.load(ServerTransportProvider.class); for (ServerTransportProvider serverTransportProvider : ServiceLoader.load(ServerTransportProvider.class)) {
for (ServerTransportProvider serverTransportProvider : transportProviders) { if (httpServerConfig.getTransportProviderName() == null ||
if (httpServerConfig.getTransportProviderName() == null || httpServerConfig.getTransportProviderName().equals(serverTransportProvider.getClass().getName())) {
httpServerConfig.getTransportProviderName().equals(serverTransportProvider.getClass().getName())) { parentEventLoopGroup = serverTransportProvider.createEventLoopGroup(httpServerConfig.getParentThreadCount(),
eventLoopGroup = serverTransportProvider.createEventLoopGroup(httpServerConfig.getParentThreadCount(), threadFactory);
threadFactory); }
}
if (parentEventLoopGroup == null) {
parentEventLoopGroup = new NioEventLoopGroup(httpServerConfig.getParentThreadCount(), threadFactory);
} }
} }
if (eventLoopGroup == null) {
eventLoopGroup = new NioEventLoopGroup(httpServerConfig.getParentThreadCount(), threadFactory);
}
return eventLoopGroup;
} }
private static EventLoopGroup createChildEventLoopGroup(NettyHttpServerConfig httpServerConfig, private void createChildEventLoopGroup(NettyHttpServerConfig httpServerConfig) {
EventLoopGroup childEventLoopGroup) {
if (childEventLoopGroup != null) { if (childEventLoopGroup != null) {
return childEventLoopGroup; return;
} }
EventLoopGroup eventLoopGroup = null; synchronized (this) {
ThreadFactory threadFactory = new NamedThreadFactory("org-xbib-net-http-netty-server-child"); ThreadFactory threadFactory = new NamedThreadFactory("org-xbib-net-http-netty-server-child");
ServiceLoader<ServerTransportProvider> transportProviders = ServiceLoader.load(ServerTransportProvider.class); ServiceLoader<ServerTransportProvider> transportProviders = ServiceLoader.load(ServerTransportProvider.class);
for (ServerTransportProvider serverTransportProvider : transportProviders) { for (ServerTransportProvider serverTransportProvider : transportProviders) {
if (httpServerConfig.getTransportProviderName() == null || if (httpServerConfig.getTransportProviderName() == null ||
httpServerConfig.getTransportProviderName().equals(serverTransportProvider.getClass().getName())) { httpServerConfig.getTransportProviderName().equals(serverTransportProvider.getClass().getName())) {
eventLoopGroup = serverTransportProvider.createEventLoopGroup(httpServerConfig.getChildThreadCount(), childEventLoopGroup = serverTransportProvider.createEventLoopGroup(httpServerConfig.getChildThreadCount(),
threadFactory); threadFactory);
}
}
if (childEventLoopGroup == null) {
childEventLoopGroup = new NioEventLoopGroup(httpServerConfig.getChildThreadCount(), threadFactory);
} }
} }
if (eventLoopGroup == null) {
eventLoopGroup = new NioEventLoopGroup(httpServerConfig.getChildThreadCount(), threadFactory);
}
return eventLoopGroup;
} }
private static Class<? extends ServerSocketChannel> createSocketChannelClass(NettyHttpServerConfig httpServerConfig, private void createSocketChannelClass(NettyHttpServerConfig httpServerConfig) {
Class<? extends ServerSocketChannel> socketChannelClass) {
if (socketChannelClass != null) { if (socketChannelClass != null) {
return socketChannelClass; return;
} }
Class<? extends ServerSocketChannel> channelClass = null; synchronized (lock) {
ServiceLoader<ServerTransportProvider> transportProviders = ServiceLoader.load(ServerTransportProvider.class); ServiceLoader<ServerTransportProvider> transportProviders = ServiceLoader.load(ServerTransportProvider.class);
for (ServerTransportProvider serverTransportProvider : transportProviders) { for (ServerTransportProvider serverTransportProvider : transportProviders) {
if (httpServerConfig.getTransportProviderName() == null || httpServerConfig.getTransportProviderName().equals(serverTransportProvider.getClass().getName())) { if (httpServerConfig.getTransportProviderName() == null || httpServerConfig.getTransportProviderName().equals(serverTransportProvider.getClass().getName())) {
channelClass = serverTransportProvider.createServerSocketChannelClass(); socketChannelClass = serverTransportProvider.createServerSocketChannelClass();
break; break;
}
}
if (socketChannelClass == null) {
socketChannelClass = NioServerSocketChannel.class;
} }
} }
if (channelClass == null) {
channelClass = NioServerSocketChannel.class;
}
return channelClass;
} }
} }