working on events

This commit is contained in:
Jörg Prante 2024-01-19 17:19:13 +01:00
parent 99cc9e2c90
commit 420611ed9d
3 changed files with 157 additions and 25 deletions

View file

@ -33,20 +33,20 @@ public final class EventManager {
private final Builder builder; private final Builder builder;
private final Map<Class<? extends EventManagerService>, EventManagerService> services; private final Map<Class<? extends EventManagerService>, EventManagerService> eventManagerServices;
private EventManager(Builder builder) { private EventManager(Builder builder) {
this.builder = builder; this.builder = builder;
this.services = new HashMap<>(); this.eventManagerServices = new HashMap<>();
services.put(GenericEventManagerService.class, new GenericEventManagerService().init(this)); eventManagerServices.put(GenericEventManagerService.class, new GenericEventManagerService().init(this));
services.put(ClockEventManagerService.class, new ClockEventManagerService().init(this)); eventManagerServices.put(ClockEventManagerService.class, new ClockEventManagerService().init(this));
services.put(TimerEventManagerService.class, new TimerEventManagerService().init(this)); eventManagerServices.put(TimerEventManagerService.class, new TimerEventManagerService().init(this));
services.put(FileFollowEventManagerService.class, new FileFollowEventManagerService().init(this)); eventManagerServices.put(FileFollowEventManagerService.class, new FileFollowEventManagerService().init(this));
services.put(PathEventManagerService.class, new PathEventManagerService().init(this)); eventManagerServices.put(PathEventManagerService.class, new PathEventManagerService().init(this));
for (EventManagerService service : ServiceLoader.load(EventManagerService.class)) { for (EventManagerService service : ServiceLoader.load(EventManagerService.class)) {
services.put(service.getClass(), service.init(this)); eventManagerServices.put(service.getClass(), service.init(this));
} }
logger.log(Level.INFO, "installed event service managers = " + services.keySet()); logger.log(Level.INFO, "installed event service managers = " + eventManagerServices.keySet());
} }
public Settings getSettings() { public Settings getSettings() {
@ -65,36 +65,36 @@ public final class EventManager {
return builder.executorService; return builder.executorService;
} }
public static Builder builder(Settings settings) { public static Builder builder() {
return new Builder(settings); return new Builder();
} }
public void submit(Event event) { public void submit(Event event) {
getGenericEventManagerService().post(event); getGenericEventManagerService().post(event);
} }
public EventManagerService getEventManagerService(Class<? extends EventManagerService> cl) { public <T> EventManagerService getEventManagerService(Class<T> cl) {
return services.get(cl); return eventManagerServices.get(cl);
} }
public GenericEventManagerService getGenericEventManagerService() { public GenericEventManagerService getGenericEventManagerService() {
return (GenericEventManagerService) services.get(GenericEventManagerService.class); return (GenericEventManagerService) eventManagerServices.get(GenericEventManagerService.class);
} }
public ClockEventManagerService getClockEventManagerService() { public ClockEventManagerService getClockEventManagerService() {
return (ClockEventManagerService) services.get(ClockEventManagerService.class); return (ClockEventManagerService) eventManagerServices.get(ClockEventManagerService.class);
} }
public TimerEventManagerService getTimerEventManagerService() { public TimerEventManagerService getTimerEventManagerService() {
return (TimerEventManagerService) services.get(TimerEventManagerService.class); return (TimerEventManagerService) eventManagerServices.get(TimerEventManagerService.class);
} }
public FileFollowEventManagerService getFileFollowEventManagerService() { public FileFollowEventManagerService getFileFollowEventManagerService() {
return (FileFollowEventManagerService) services.get(FileFollowEventManagerService.class); return (FileFollowEventManagerService) eventManagerServices.get(FileFollowEventManagerService.class);
} }
public PathEventManagerService getPathEventManagerService() { public PathEventManagerService getPathEventManagerService() {
return (PathEventManagerService) services.get(PathEventManagerService.class); return (PathEventManagerService) eventManagerServices.get(PathEventManagerService.class);
} }
public void close() throws IOException { public void close() throws IOException {
@ -103,7 +103,7 @@ public final class EventManager {
closeable.close(); closeable.close();
} }
} }
for (EventManagerService service : services.values()) { for (EventManagerService service : eventManagerServices.values()) {
try { try {
service.shutdown(); service.shutdown();
} catch (IOException e) { } catch (IOException e) {
@ -114,7 +114,9 @@ public final class EventManager {
public static class Builder { public static class Builder {
private final Settings settings; private Settings settings;
private int threadcount;
private ExecutorService executorService; private ExecutorService executorService;
@ -122,16 +124,25 @@ public final class EventManager {
private SubscriberExceptionHandler subscriberExceptionHandler; private SubscriberExceptionHandler subscriberExceptionHandler;
private AsyncEventBus eventBus; private EventBus eventBus;
private final List<EventConsumer> eventConsumers; private final List<EventConsumer> eventConsumers;
private Builder(Settings settings) { private Builder() {
this.settings = settings;
this.eventConsumers = new ArrayList<>(); this.eventConsumers = new ArrayList<>();
this.subscriberExceptionHandler = new EventManagerExceptionHandler(); this.subscriberExceptionHandler = new EventManagerExceptionHandler();
} }
public Builder setSettings(Settings settings) {
this.settings = settings;
return this;
}
public Builder setThreadCount(int threadCount) {
this.threadcount = threadCount;
return this;
}
public Builder setExecutorService(ExecutorService executorService) { public Builder setExecutorService(ExecutorService executorService) {
this.executorService = executorService; this.executorService = executorService;
return this; return this;
@ -142,7 +153,7 @@ public final class EventManager {
return this; return this;
} }
public Builder setEventBus(AsyncEventBus eventBus) { public Builder setEventBus(EventBus eventBus) {
this.eventBus = eventBus; this.eventBus = eventBus;
return this; return this;
} }
@ -169,8 +180,15 @@ public final class EventManager {
} }
public EventManager build() { public EventManager build() {
if (settings == null) {
settings = Settings.emptySettings();
}
if (executorService == null) { if (executorService == null) {
executorService = Executors.newFixedThreadPool(settings.getAsInt("event.threadcount", 2)); if (threadcount == 0) {
executorService = Executors.newCachedThreadPool();
} else {
executorService = Executors.newFixedThreadPool(threadcount);
}
} }
if (classLoader == null) { if (classLoader == null) {
classLoader = EventManager.class.getClassLoader(); classLoader = EventManager.class.getClassLoader();

View file

@ -0,0 +1 @@
org.xbib.event.net.http.HttpEventManagerService

View file

@ -0,0 +1,113 @@
package org.xbib.event.net.http.test;
import io.netty.bootstrap.Bootstrap;
import org.junit.jupiter.api.Test;
import org.xbib.event.common.EventManager;
import org.xbib.event.net.http.HttpEventManagerService;
import org.xbib.net.NetworkClass;
import org.xbib.net.URL;
import org.xbib.net.http.HttpAddress;
import org.xbib.net.http.HttpHeaderNames;
import org.xbib.net.http.HttpHeaderValues;
import org.xbib.net.http.HttpResponseStatus;
import org.xbib.net.http.client.netty.HttpRequest;
import org.xbib.net.http.client.netty.NettyHttpClient;
import org.xbib.net.http.client.netty.NettyHttpClientConfig;
import org.xbib.net.http.server.application.BaseApplication;
import org.xbib.net.http.server.domain.BaseHttpDomain;
import org.xbib.net.http.server.executor.BaseExecutor;
import org.xbib.net.http.server.executor.Executor;
import org.xbib.net.http.server.netty.NettyHttpServer;
import org.xbib.net.http.server.netty.NettyHttpServerConfig;
import org.xbib.net.http.server.route.BaseHttpRouter;
import org.xbib.net.http.server.route.HttpRouter;
import org.xbib.net.http.server.service.BaseHttpService;
import org.xbib.net.util.JsonUtil;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class HttpEventTest {
private static final Logger logger = Logger.getLogger(HttpEventTest.class.getName());
@Test
public void testHttpEvent() throws Exception {
URL url = URL.from("http://localhost:8008/event/");
HttpAddress httpAddress1 = HttpAddress.http1(url);
NettyHttpServerConfig nettyHttpServerConfig = new NettyHttpServerConfig();
nettyHttpServerConfig.setServerName("NettyHttpServer",
Bootstrap.class.getPackage().getImplementationVersion());
nettyHttpServerConfig.setNetworkClass(NetworkClass.LOCAL);
nettyHttpServerConfig.setDebug(true);
EventManager eventManager = EventManager.builder()
.loadEventConsumers()
.build();
HttpEventManagerService httpEventManagerService = eventManager.getEventManagerService(HttpEventManagerService.class);
HttpRouter router = BaseHttpRouter.builder()
.addDomain(BaseHttpDomain.builder()
.setHttpAddress(httpAddress1)
.addService(BaseHttpService.builder()
.setPath("/event")
.setHandler(ctx -> {
ctx.status(HttpResponseStatus.OK)
.header(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.TEXT_PLAIN)
.charset(StandardCharsets.UTF_8)
.body(ctx.getRequest().asJson())
.done();
})
.build())
.build())
.build();
Executor executor = BaseExecutor.builder()
.build();
try (NettyHttpServer server = NettyHttpServer.builder()
.setHttpServerConfig(nettyHttpServerConfig)
.setApplication(BaseApplication.builder()
.setExecutor(executor)
.setRouter(router)
.build())
.build()) {
server.bind();
NettyHttpClientConfig config = new NettyHttpClientConfig()
.setDebug(true);
AtomicBoolean received = new AtomicBoolean();
try (NettyHttpClient client = NettyHttpClient.builder()
.setConfig(config)
.build()) {
HttpRequest request = HttpRequest.get()
.setURL(url)
.setResponseListener(resp -> {
String body = resp.getBodyAsChars(StandardCharsets.UTF_8).toString();
logger.log(Level.INFO, "got response:" +
" status = " + resp.getStatus() +
" header = " + resp.getHeaders() +
" body = " + body);
try {
Map<String, Object> map = JsonUtil.toMap(body);
org.xbib.net.http.server.netty.HttpRequest httpRequest = org.xbib.net.http.server.netty.HttpRequest.builder()
.parse(map).build();
logger.log(Level.INFO, "parsed http request = " + httpRequest.asJson());
} catch (IOException e) {
throw new RuntimeException(e);
}
received.set(true);
})
.build();
client.execute(request).get().close();
}
assertTrue(received.get());
}
}
}