add http events

main
Jörg Prante 5 months ago
parent 5c31d6e055
commit 99cc9e2c90

@ -1,9 +1,9 @@
dependencies { dependencies {
api project(':event-api') api project(':event-api')
api libs.settings.api api libs.settings.api
implementation libs.settings.datastructures.json
implementation libs.net implementation libs.net
implementation libs.time implementation libs.time
implementation libs.settings.datastructures.json
implementation libs.datastructures.common implementation libs.datastructures.common
implementation libs.datastructures.json.tiny implementation libs.datastructures.json.tiny
} }

@ -11,6 +11,7 @@ module org.xbib.event.common {
exports org.xbib.event.timer; exports org.xbib.event.timer;
exports org.xbib.event.wal; exports org.xbib.event.wal;
uses EventManagerService; uses EventManagerService;
uses org.xbib.event.EventConsumer;
requires org.xbib.event.api; requires org.xbib.event.api;
requires org.xbib.settings.api; requires org.xbib.settings.api;
requires org.xbib.settings.datastructures.json; requires org.xbib.settings.datastructures.json;

@ -1,24 +1,23 @@
package org.xbib.event.clock; package org.xbib.event.clock;
import org.xbib.event.bus.EventBus; import org.xbib.event.bus.EventBus;
import org.xbib.event.common.EventManager;
import org.xbib.event.common.EventManagerService; import org.xbib.event.common.EventManagerService;
import org.xbib.settings.Settings; import org.xbib.settings.Settings;
import org.xbib.time.schedule.CronExpression; import org.xbib.time.schedule.CronExpression;
import org.xbib.time.schedule.CronSchedule; import org.xbib.time.schedule.CronSchedule;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
public class ClockEventManagerService implements EventManagerService, Closeable { public class ClockEventManagerService implements EventManagerService {
private static final Logger logger = Logger.getLogger(ClockEventManagerService.class.getName()); private static final Logger logger = Logger.getLogger(ClockEventManagerService.class.getName());
@ -29,7 +28,10 @@ public class ClockEventManagerService implements EventManagerService, Closeable
public ClockEventManagerService() { public ClockEventManagerService() {
} }
public ClockEventManagerService init(Settings settings, EventBus eventBus, ClassLoader classLoader, ExecutorService executorService) { public ClockEventManagerService init(EventManager eventManager) {
Settings settings = eventManager.getSettings();
EventBus eventBus = eventManager.getEventBus();
ClassLoader classLoader = eventManager.getClassLoader();
this.suspended = new ArrayList<>(); this.suspended = new ArrayList<>();
ThreadFactory threadFactory = new ThreadFactory() { ThreadFactory threadFactory = new ThreadFactory() {
int n = 1; int n = 1;
@ -83,7 +85,7 @@ public class ClockEventManagerService implements EventManagerService, Closeable
} }
@Override @Override
public void close() throws IOException { public void shutdown() throws IOException {
cronSchedule.close(); cronSchedule.close();
} }
} }

@ -7,6 +7,7 @@ import java.util.ServiceLoader;
import org.xbib.event.Event; import org.xbib.event.Event;
import org.xbib.event.EventConsumer; import org.xbib.event.EventConsumer;
import org.xbib.event.bus.AsyncEventBus; import org.xbib.event.bus.AsyncEventBus;
import org.xbib.event.bus.EventBus;
import org.xbib.event.bus.SubscriberExceptionContext; import org.xbib.event.bus.SubscriberExceptionContext;
import org.xbib.event.bus.SubscriberExceptionHandler; import org.xbib.event.bus.SubscriberExceptionHandler;
import org.xbib.event.clock.ClockEventManagerService; import org.xbib.event.clock.ClockEventManagerService;
@ -37,22 +38,33 @@ public final class EventManager {
private EventManager(Builder builder) { private EventManager(Builder builder) {
this.builder = builder; this.builder = builder;
this.services = new HashMap<>(); this.services = new HashMap<>();
services.put(GenericEventManagerService.class, new GenericEventManagerService() services.put(GenericEventManagerService.class, new GenericEventManagerService().init(this));
.init(builder.settings, builder.eventBus, builder.classLoader, builder.executorService)); services.put(ClockEventManagerService.class, new ClockEventManagerService().init(this));
services.put(ClockEventManagerService.class, new ClockEventManagerService() services.put(TimerEventManagerService.class, new TimerEventManagerService().init(this));
.init(builder.settings, builder.eventBus, builder.classLoader, builder.executorService)); services.put(FileFollowEventManagerService.class, new FileFollowEventManagerService().init(this));
services.put(TimerEventManagerService.class, new TimerEventManagerService() services.put(PathEventManagerService.class, new PathEventManagerService().init(this));
.init(builder.settings, builder.eventBus, builder.classLoader, builder.executorService));
services.put(FileFollowEventManagerService.class, new FileFollowEventManagerService()
.init(builder.settings, builder.eventBus, builder.classLoader, builder.executorService));
services.put(PathEventManagerService.class, new PathEventManagerService()
.init(builder.settings, builder.eventBus, builder.classLoader, builder.executorService));
for (EventManagerService service : ServiceLoader.load(EventManagerService.class)) { for (EventManagerService service : ServiceLoader.load(EventManagerService.class)) {
services.put(service.getClass(), service.init(builder.settings, builder.eventBus, builder.classLoader, builder.executorService)); services.put(service.getClass(), service.init(this));
} }
logger.log(Level.INFO, "installed event service managers = " + services.keySet()); logger.log(Level.INFO, "installed event service managers = " + services.keySet());
} }
public Settings getSettings() {
return builder.settings;
}
public EventBus getEventBus() {
return builder.eventBus;
}
public ClassLoader getClassLoader() {
return builder.classLoader;
}
public ExecutorService getExecutorService() {
return builder.executorService;
}
public static Builder builder(Settings settings) { public static Builder builder(Settings settings) {
return new Builder(settings); return new Builder(settings);
} }
@ -92,8 +104,10 @@ public final class EventManager {
} }
} }
for (EventManagerService service : services.values()) { for (EventManagerService service : services.values()) {
if (service instanceof Closeable closeable) { try {
closeable.close(); service.shutdown();
} catch (IOException e) {
logger.log(Level.SEVERE, e.getMessage(), e);
} }
} }
} }

@ -1,11 +1,10 @@
package org.xbib.event.common; package org.xbib.event.common;
import org.xbib.event.bus.EventBus; import java.io.IOException;
import org.xbib.settings.Settings;
import java.util.concurrent.ExecutorService;
public interface EventManagerService { public interface EventManagerService {
EventManagerService init(Settings settings, EventBus eventBus, ClassLoader classLoader, ExecutorService executorService); EventManagerService init(EventManager eventManager);
void shutdown() throws IOException;
} }

@ -1,14 +1,14 @@
package org.xbib.event.generic; package org.xbib.event.generic;
import java.io.IOException;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import org.xbib.event.bus.EventBus; import org.xbib.event.bus.EventBus;
import org.xbib.event.bus.Subscriber; import org.xbib.event.bus.Subscriber;
import org.xbib.event.bus.SubscriberRegistry; import org.xbib.event.bus.SubscriberRegistry;
import org.xbib.event.common.EventManager;
import org.xbib.event.common.EventManagerService; import org.xbib.event.common.EventManagerService;
import org.xbib.settings.Settings;
public class GenericEventManagerService implements EventManagerService { public class GenericEventManagerService implements EventManagerService {
@ -18,11 +18,16 @@ public class GenericEventManagerService implements EventManagerService {
} }
@Override @Override
public GenericEventManagerService init(Settings settings, EventBus eventBus, ClassLoader classLoader, ExecutorService executorService) { public GenericEventManagerService init(EventManager eventManager) {
this.eventBus = eventBus; this.eventBus = eventManager.getEventBus();
return this; return this;
} }
@Override
public void shutdown() throws IOException {
}
public void post(Object event) { public void post(Object event) {
eventBus.post(event); eventBus.post(event);
} }

@ -1,6 +1,7 @@
package org.xbib.event.path; package org.xbib.event.path;
import org.xbib.event.bus.EventBus; import org.xbib.event.bus.EventBus;
import org.xbib.event.common.EventManager;
import org.xbib.event.common.EventManagerService; import org.xbib.event.common.EventManagerService;
import org.xbib.settings.Settings; import org.xbib.settings.Settings;
@ -17,7 +18,7 @@ import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import java.util.regex.Pattern; import java.util.regex.Pattern;
public class FileFollowEventManagerService implements EventManagerService, Closeable { public class FileFollowEventManagerService implements EventManagerService {
private static final Logger logger = Logger.getLogger(FileFollowEventManagerService.class.getName()); private static final Logger logger = Logger.getLogger(FileFollowEventManagerService.class.getName());
@ -28,10 +29,11 @@ public class FileFollowEventManagerService implements EventManagerService, Close
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public FileFollowEventManagerService init(Settings settings, public FileFollowEventManagerService init(EventManager eventManager) {
EventBus eventBus, Settings settings = eventManager.getSettings();
ClassLoader classLoader, EventBus eventBus = eventManager.getEventBus();
ExecutorService executorService) { ClassLoader classLoader = eventManager.getClassLoader();
ExecutorService executorService = eventManager.getExecutorService();
this.eventServiceMap = new LinkedHashMap<>(); this.eventServiceMap = new LinkedHashMap<>();
for (Map.Entry<String, Settings> entry : settings.getGroups("event.filefollow").entrySet()) { for (Map.Entry<String, Settings> entry : settings.getGroups("event.filefollow").entrySet()) {
Settings definition = entry.getValue(); Settings definition = entry.getValue();
@ -58,10 +60,14 @@ public class FileFollowEventManagerService implements EventManagerService, Close
} }
@Override @Override
public void close() throws IOException { public void shutdown() throws IOException {
for (Map.Entry<Future<?>, FileFollowEventService> entry : eventServiceMap.entrySet()) { for (Map.Entry<Future<?>, FileFollowEventService> entry : eventServiceMap.entrySet()) {
entry.getValue().setKeepWatching(false); try {
entry.getKey().cancel(true); entry.getValue().setKeepWatching(false);
entry.getKey().cancel(true);
} catch (Exception e) {
logger.log(Level.SEVERE, e.getMessage(), e);
}
} }
} }
} }

@ -3,10 +3,10 @@ package org.xbib.event.path;
import org.xbib.datastructures.api.TimeValue; import org.xbib.datastructures.api.TimeValue;
import org.xbib.datastructures.json.tiny.Json; import org.xbib.datastructures.json.tiny.Json;
import org.xbib.event.bus.EventBus; import org.xbib.event.bus.EventBus;
import org.xbib.event.common.EventManager;
import org.xbib.event.common.EventManagerService; import org.xbib.event.common.EventManagerService;
import org.xbib.settings.Settings; import org.xbib.settings.Settings;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException; import java.io.UncheckedIOException;
import java.io.Writer; import java.io.Writer;
@ -23,7 +23,7 @@ import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import java.util.stream.Stream; import java.util.stream.Stream;
public class PathEventManagerService implements EventManagerService, Closeable { public class PathEventManagerService implements EventManagerService {
private static final Logger logger = Logger.getLogger(PathEventManagerService.class.getName()); private static final Logger logger = Logger.getLogger(PathEventManagerService.class.getName());
@ -42,12 +42,11 @@ public class PathEventManagerService implements EventManagerService, Closeable {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public PathEventManagerService init(Settings settings, public PathEventManagerService init(EventManager eventManager) {
EventBus eventBus, Settings settings = eventManager.getSettings();
ClassLoader classLoader, ClassLoader classLoader = eventManager.getClassLoader();
ExecutorService executorService) { this.eventBus = eventManager.getEventBus();
this.eventBus = eventBus; this.executorService = eventManager.getExecutorService();
this.executorService = executorService;
this.eventServiceMap = new LinkedHashMap<>(); this.eventServiceMap = new LinkedHashMap<>();
this.suspendedQueues = new ArrayList<>(); this.suspendedQueues = new ArrayList<>();
this.path = Paths.get(settings.get("event.pathevent.base", "/var/tmp/pathevent")); this.path = Paths.get(settings.get("event.pathevent.base", "/var/tmp/pathevent"));
@ -90,8 +89,8 @@ public class PathEventManagerService implements EventManagerService, Closeable {
} }
@Override @Override
public void close() throws IOException { public void shutdown() throws IOException {
logger.log(Level.INFO, "closing all path event services"); logger.log(Level.INFO, "shut down all path event services");
eventServiceMap.forEach((k, v) -> { eventServiceMap.forEach((k, v) -> {
k.cancel(true); k.cancel(true);
try { try {

@ -1,7 +1,7 @@
package org.xbib.event.timer; package org.xbib.event.timer;
import org.xbib.event.bus.AsyncEventBus;
import org.xbib.event.bus.EventBus; import org.xbib.event.bus.EventBus;
import org.xbib.event.common.EventManager;
import org.xbib.event.common.EventManagerService; import org.xbib.event.common.EventManagerService;
import org.xbib.event.persistence.FilePersistenceStore; import org.xbib.event.persistence.FilePersistenceStore;
import org.xbib.event.persistence.PersistenceStore; import org.xbib.event.persistence.PersistenceStore;
@ -9,7 +9,6 @@ import org.xbib.settings.Settings;
import org.xbib.time.chronic.Chronic; import org.xbib.time.chronic.Chronic;
import org.xbib.time.chronic.Span; import org.xbib.time.chronic.Span;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.text.ParseException; import java.text.ParseException;
import java.time.Instant; import java.time.Instant;
@ -17,12 +16,10 @@ import java.time.ZoneId;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
public class TimerEventManagerService implements EventManagerService, Closeable { public class TimerEventManagerService implements EventManagerService {
private static final Logger logger = Logger.getLogger(TimerEventManagerService.class.getName()); private static final Logger logger = Logger.getLogger(TimerEventManagerService.class.getName());
@ -31,13 +28,12 @@ public class TimerEventManagerService implements EventManagerService, Closeable
public TimerEventManagerService() { public TimerEventManagerService() {
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public TimerEventManagerService init(Settings settings, public TimerEventManagerService init(EventManager eventManager) {
EventBus eventBus, Settings settings = eventManager.getSettings();
ClassLoader classLoader, EventBus eventBus = eventManager.getEventBus();
ExecutorService executorService) { ClassLoader classLoader = eventManager.getClassLoader();
this.services = new LinkedHashMap<>(); this.services = new LinkedHashMap<>();
for (Map.Entry<String, Settings> entry : settings.getGroups("event.timer").entrySet()) { for (Map.Entry<String, Settings> entry : settings.getGroups("event.timer").entrySet()) {
String name = entry.getKey(); String name = entry.getKey();
@ -95,7 +91,7 @@ public class TimerEventManagerService implements EventManagerService, Closeable
} }
@Override @Override
public void close() throws IOException { public void shutdown() throws IOException {
for (Map.Entry<String, TimerEventService> entry : services.entrySet()) { for (Map.Entry<String, TimerEventService> entry : services.entrySet()) {
logger.log(Level.INFO, "closing timer " + entry.getKey()); logger.log(Level.INFO, "closing timer " + entry.getKey());
entry.getValue().close(); entry.getValue().close();

@ -2,5 +2,6 @@ dependencies {
api project(':event-common') api project(':event-common')
api libs.net.http.server.netty api libs.net.http.server.netty
api libs.net.http.client.netty api libs.net.http.client.netty
implementation libs.settings.datastructures.json
implementation libs.datastructures.json.tiny implementation libs.datastructures.json.tiny
} }

@ -1,5 +1,9 @@
import org.xbib.event.common.EventManagerService;
import org.xbib.event.net.http.HttpEventManagerService;
module org.xbib.event.net.http { module org.xbib.event.net.http {
exports org.xbib.event.net.http; exports org.xbib.event.net.http;
provides EventManagerService with HttpEventManagerService;
requires org.xbib.event.api; requires org.xbib.event.api;
requires org.xbib.event.common; requires org.xbib.event.common;
requires org.xbib.net.http; requires org.xbib.net.http;
@ -9,5 +13,6 @@ module org.xbib.event.net.http {
requires org.xbib.net.http.server; requires org.xbib.net.http.server;
requires org.xbib.net.http.server.netty; requires org.xbib.net.http.server.netty;
requires org.xbib.net.http.server.netty.secure; requires org.xbib.net.http.server.netty.secure;
requires org.xbib.settings.api;
requires java.logging; requires java.logging;
} }

@ -0,0 +1,21 @@
package org.xbib.event.net.http;
import java.io.IOException;
import org.xbib.event.common.EventManager;
import org.xbib.event.common.EventManagerService;
public class HttpEventManagerService implements EventManagerService {
public HttpEventManagerService() {
}
@Override
public EventManagerService init(EventManager eventManager) {
return null;
}
@Override
public void shutdown() throws IOException {
}
}

@ -13,11 +13,11 @@ import java.nio.charset.StandardCharsets;
import static org.xbib.net.http.HttpResponseStatus.NOT_FOUND; import static org.xbib.net.http.HttpResponseStatus.NOT_FOUND;
import static org.xbib.net.http.HttpResponseStatus.OK; import static org.xbib.net.http.HttpResponseStatus.OK;
public class EventReceiverService { public class HttpEventReceiverService {
private final EventManager eventManager; private final EventManager eventManager;
public EventReceiverService(EventManager eventManager) { public HttpEventReceiverService(EventManager eventManager) {
this.eventManager = eventManager; this.eventManager = eventManager;
} }

@ -0,0 +1,18 @@
package org.xbib.event.net.http;
import org.xbib.event.common.EventManager;
public class HttpEventSubmitterService {
private final EventManager eventManager;
public HttpEventSubmitterService(EventManager eventManager) {
this.eventManager = eventManager;
}
public void submit(Object event) {
}
}

@ -1,19 +1,19 @@
package org.xbib.event.syslog; package org.xbib.event.syslog;
import org.xbib.event.bus.EventBus; import org.xbib.event.bus.EventBus;
import org.xbib.event.common.EventManager;
import org.xbib.event.common.EventManagerService; import org.xbib.event.common.EventManagerService;
import org.xbib.settings.Settings; import org.xbib.settings.Settings;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
public class SyslogEventManagerService implements EventManagerService, Closeable { public class SyslogEventManagerService implements EventManagerService {
private static final Logger logger = Logger.getLogger(SyslogEventManagerService.class.getName()); private static final Logger logger = Logger.getLogger(SyslogEventManagerService.class.getName());
private List<SyslogService> syslogServices; private List<SyslogService> syslogServices;
@ -21,10 +21,9 @@ public class SyslogEventManagerService implements EventManagerService, Closeable
public SyslogEventManagerService() {} public SyslogEventManagerService() {}
@Override @Override
public SyslogEventManagerService init(Settings settings, public SyslogEventManagerService init(EventManager eventManager) {
EventBus eventBus, Settings settings = eventManager.getSettings();
ClassLoader classLoader, EventBus eventBus = eventManager.getEventBus();
ExecutorService executorService) {
this.syslogServices = new ArrayList<>(); this.syslogServices = new ArrayList<>();
for (Map.Entry<String, Settings> entry : settings.getGroups("event.syslog").entrySet()) { for (Map.Entry<String, Settings> entry : settings.getGroups("event.syslog").entrySet()) {
Settings definition = entry.getValue(); Settings definition = entry.getValue();
@ -42,7 +41,7 @@ public class SyslogEventManagerService implements EventManagerService, Closeable
} }
@Override @Override
public void close() throws IOException { public void shutdown() throws IOException {
for (SyslogService syslogService : syslogServices) { for (SyslogService syslogService : syslogServices) {
syslogService.close(); syslogService.close();
} }

Loading…
Cancel
Save