refactoring to event manager services, new subproject for syslog with netty dependencies
This commit is contained in:
parent
120a9e915d
commit
09a7a95b25
44 changed files with 190 additions and 123 deletions
|
@ -6,5 +6,4 @@ dependencies {
|
||||||
implementation libs.time
|
implementation libs.time
|
||||||
implementation libs.datastructures.common
|
implementation libs.datastructures.common
|
||||||
implementation libs.datastructures.json.tiny
|
implementation libs.datastructures.json.tiny
|
||||||
implementation libs.netty.handler
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,3 +1,5 @@
|
||||||
|
import org.xbib.event.common.EventManagerService;
|
||||||
|
|
||||||
module org.xbib.event.common {
|
module org.xbib.event.common {
|
||||||
exports org.xbib.event.bus;
|
exports org.xbib.event.bus;
|
||||||
exports org.xbib.event.clock;
|
exports org.xbib.event.clock;
|
||||||
|
@ -6,9 +8,9 @@ module org.xbib.event.common {
|
||||||
exports org.xbib.event.log;
|
exports org.xbib.event.log;
|
||||||
exports org.xbib.event.path;
|
exports org.xbib.event.path;
|
||||||
exports org.xbib.event.persistence;
|
exports org.xbib.event.persistence;
|
||||||
exports org.xbib.event.syslog;
|
|
||||||
exports org.xbib.event.timer;
|
exports org.xbib.event.timer;
|
||||||
exports org.xbib.event.wal;
|
exports org.xbib.event.wal;
|
||||||
|
uses EventManagerService;
|
||||||
requires org.xbib.event.api;
|
requires org.xbib.event.api;
|
||||||
requires org.xbib.settings.api;
|
requires org.xbib.settings.api;
|
||||||
requires org.xbib.settings.datastructures.json;
|
requires org.xbib.settings.datastructures.json;
|
||||||
|
@ -16,10 +18,5 @@ module org.xbib.event.common {
|
||||||
requires org.xbib.time;
|
requires org.xbib.time;
|
||||||
requires org.xbib.datastructures.common;
|
requires org.xbib.datastructures.common;
|
||||||
requires org.xbib.datastructures.json.tiny;
|
requires org.xbib.datastructures.json.tiny;
|
||||||
requires io.netty.buffer;
|
|
||||||
requires io.netty.common;
|
|
||||||
requires io.netty.transport;
|
|
||||||
requires io.netty.handler;
|
|
||||||
requires io.netty.codec;
|
|
||||||
requires java.logging;
|
requires java.logging;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
package org.xbib.event.clock;
|
package org.xbib.event.clock;
|
||||||
|
|
||||||
import org.xbib.event.bus.AsyncEventBus;
|
|
||||||
import org.xbib.event.bus.EventBus;
|
import org.xbib.event.bus.EventBus;
|
||||||
|
import org.xbib.event.common.EventManagerService;
|
||||||
import org.xbib.settings.Settings;
|
import org.xbib.settings.Settings;
|
||||||
import org.xbib.time.schedule.CronExpression;
|
import org.xbib.time.schedule.CronExpression;
|
||||||
import org.xbib.time.schedule.CronSchedule;
|
import org.xbib.time.schedule.CronSchedule;
|
||||||
|
@ -11,27 +11,25 @@ import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.logging.Level;
|
import java.util.logging.Level;
|
||||||
import java.util.logging.Logger;
|
import java.util.logging.Logger;
|
||||||
|
|
||||||
public class ClockEventManager implements Closeable {
|
public class ClockEventManagerService implements EventManagerService, Closeable {
|
||||||
|
|
||||||
private static final Logger logger = Logger.getLogger(ClockEventManager.class.getName());
|
private static final Logger logger = Logger.getLogger(ClockEventManagerService.class.getName());
|
||||||
|
|
||||||
private final CronSchedule<Integer> cronSchedule;
|
private CronSchedule<Integer> cronSchedule;
|
||||||
|
|
||||||
private final List<String> suspended;
|
private List<String> suspended;
|
||||||
|
|
||||||
public ClockEventManager(Settings settings) {
|
public ClockEventManagerService() {
|
||||||
this(settings, new AsyncEventBus(Executors.newSingleThreadExecutor()), ClockEventManager.class.getClassLoader());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public ClockEventManager(Settings settings,
|
public ClockEventManagerService init(Settings settings, EventBus eventBus, ClassLoader classLoader, ExecutorService executorService) {
|
||||||
EventBus eventBus,
|
|
||||||
ClassLoader classLoader) {
|
|
||||||
this.suspended = new ArrayList<>();
|
this.suspended = new ArrayList<>();
|
||||||
ThreadFactory threadFactory = new ThreadFactory() {
|
ThreadFactory threadFactory = new ThreadFactory() {
|
||||||
int n = 1;
|
int n = 1;
|
||||||
|
@ -40,9 +38,9 @@ public class ClockEventManager implements Closeable {
|
||||||
return new Thread(r, "clock-event-manager-" + (n++));
|
return new Thread(r, "clock-event-manager-" + (n++));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
ScheduledExecutorService executorService =
|
ScheduledExecutorService scheduledExecutorService =
|
||||||
Executors.newScheduledThreadPool(settings.getAsInt("pool.size", 2), threadFactory);
|
Executors.newScheduledThreadPool(settings.getAsInt("pool.size", 2), threadFactory);
|
||||||
this.cronSchedule = new CronSchedule<>(executorService);
|
this.cronSchedule = new CronSchedule<>(scheduledExecutorService);
|
||||||
for (Map.Entry<String, Settings> clockEventService : settings.getGroups("event.clock").entrySet()) {
|
for (Map.Entry<String, Settings> clockEventService : settings.getGroups("event.clock").entrySet()) {
|
||||||
String name = clockEventService.getKey();
|
String name = clockEventService.getKey();
|
||||||
Settings entrySettings = clockEventService.getValue();
|
Settings entrySettings = clockEventService.getValue();
|
||||||
|
@ -69,6 +67,7 @@ public class ClockEventManager implements Closeable {
|
||||||
}
|
}
|
||||||
logger.log(Level.INFO, "entries = " + cronSchedule.getEntries());
|
logger.log(Level.INFO, "entries = " + cronSchedule.getEntries());
|
||||||
cronSchedule.start();
|
cronSchedule.start();
|
||||||
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<String> getSuspended() {
|
public List<String> getSuspended() {
|
|
@ -10,7 +10,7 @@ public class ClockEventService implements Callable<Integer> {
|
||||||
|
|
||||||
private static final Logger logger = Logger.getLogger(ClockEventService.class.getName());
|
private static final Logger logger = Logger.getLogger(ClockEventService.class.getName());
|
||||||
|
|
||||||
private final ClockEventManager manager;
|
private final ClockEventManagerService manager;
|
||||||
|
|
||||||
private final EventBus eventBus;
|
private final EventBus eventBus;
|
||||||
|
|
||||||
|
@ -18,7 +18,7 @@ public class ClockEventService implements Callable<Integer> {
|
||||||
|
|
||||||
private final Class<? extends ClockEvent> eventClass;
|
private final Class<? extends ClockEvent> eventClass;
|
||||||
|
|
||||||
public ClockEventService(ClockEventManager manager,
|
public ClockEventService(ClockEventManagerService manager,
|
||||||
EventBus eventBus,
|
EventBus eventBus,
|
||||||
String name,
|
String name,
|
||||||
Class<? extends ClockEvent> eventClass) {
|
Class<? extends ClockEvent> eventClass) {
|
||||||
|
|
|
@ -1,22 +1,22 @@
|
||||||
package org.xbib.event.common;
|
package org.xbib.event.common;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
import java.util.ServiceLoader;
|
import java.util.ServiceLoader;
|
||||||
|
|
||||||
import org.xbib.event.EventConsumer;
|
import org.xbib.event.EventConsumer;
|
||||||
import org.xbib.event.bus.AsyncEventBus;
|
import org.xbib.event.bus.AsyncEventBus;
|
||||||
import org.xbib.event.bus.SubscriberExceptionContext;
|
import org.xbib.event.bus.SubscriberExceptionContext;
|
||||||
import org.xbib.event.bus.SubscriberExceptionHandler;
|
import org.xbib.event.bus.SubscriberExceptionHandler;
|
||||||
import org.xbib.event.clock.ClockEventManager;
|
import org.xbib.event.clock.ClockEventManagerService;
|
||||||
import org.xbib.event.generic.GenericEventManager;
|
import org.xbib.event.generic.GenericEventManagerService;
|
||||||
import org.xbib.event.path.FileFollowEventManager;
|
import org.xbib.event.path.FileFollowEventManagerService;
|
||||||
import org.xbib.event.path.PathEventManager;
|
import org.xbib.event.path.PathEventManagerService;
|
||||||
import org.xbib.event.syslog.SyslogEventManager;
|
import org.xbib.event.timer.TimerEventManagerService;
|
||||||
import org.xbib.event.timer.TimerEventManager;
|
|
||||||
import org.xbib.settings.Settings;
|
import org.xbib.settings.Settings;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.time.ZoneId;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
@ -31,54 +31,53 @@ public final class EventManager {
|
||||||
|
|
||||||
private final Builder builder;
|
private final Builder builder;
|
||||||
|
|
||||||
private final GenericEventManager genericEventManager;
|
private final Map<Class<? extends EventManagerService>, EventManagerService> services;
|
||||||
|
|
||||||
private final ClockEventManager clockEventManager;
|
|
||||||
|
|
||||||
private final TimerEventManager timerEventManager;
|
|
||||||
|
|
||||||
private final FileFollowEventManager fileFollowEventManager;
|
|
||||||
|
|
||||||
private final PathEventManager pathEventManager;
|
|
||||||
|
|
||||||
private final SyslogEventManager syslogEventManager;
|
|
||||||
|
|
||||||
private EventManager(Builder builder) {
|
private EventManager(Builder builder) {
|
||||||
this.builder = builder;
|
this.builder = builder;
|
||||||
this.genericEventManager = new GenericEventManager(builder.eventBus);
|
this.services = new HashMap<>();
|
||||||
this.clockEventManager = new ClockEventManager(builder.settings, builder.eventBus, builder.classLoader);
|
services.put(GenericEventManagerService.class, new GenericEventManagerService()
|
||||||
this.timerEventManager = new TimerEventManager(builder.settings, builder.eventBus, builder.classLoader, ZoneId.systemDefault());
|
.init(builder.settings, builder.eventBus, builder.classLoader, builder.executorService));
|
||||||
this.fileFollowEventManager = new FileFollowEventManager(builder.settings, builder.eventBus, builder.executorService, builder.classLoader);
|
services.put(ClockEventManagerService.class, new ClockEventManagerService()
|
||||||
this.pathEventManager = new PathEventManager(builder.settings, builder.eventBus, builder.executorService, builder.classLoader);
|
.init(builder.settings, builder.eventBus, builder.classLoader, builder.executorService));
|
||||||
this.syslogEventManager = new SyslogEventManager(builder.settings, builder.eventBus);
|
services.put(TimerEventManagerService.class, new TimerEventManagerService()
|
||||||
|
.init(builder.settings, builder.eventBus, builder.classLoader, builder.executorService));
|
||||||
|
services.put(FileFollowEventManagerService.class, new FileFollowEventManagerService()
|
||||||
|
.init(builder.settings, builder.eventBus, builder.classLoader, builder.executorService));
|
||||||
|
services.put(PathEventManagerService.class, new PathEventManagerService()
|
||||||
|
.init(builder.settings, builder.eventBus, builder.classLoader, builder.executorService));
|
||||||
|
for (EventManagerService service : ServiceLoader.load(EventManagerService.class)) {
|
||||||
|
services.put(service.getClass(), service.init(builder.settings, builder.eventBus, builder.classLoader, builder.executorService));
|
||||||
|
}
|
||||||
|
logger.log(Level.INFO, "installed event service managers = " + services.keySet());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Builder builder(Settings settings) {
|
public static Builder builder(Settings settings) {
|
||||||
return new Builder(settings);
|
return new Builder(settings);
|
||||||
}
|
}
|
||||||
|
|
||||||
public GenericEventManager getGenericEventManager() {
|
public EventManagerService getEventManagerService(Class<? extends EventManagerService> cl) {
|
||||||
return genericEventManager;
|
return services.get(cl);
|
||||||
}
|
}
|
||||||
|
|
||||||
public ClockEventManager getClockEventManager() {
|
public GenericEventManagerService getGenericEventManagerService() {
|
||||||
return clockEventManager;
|
return (GenericEventManagerService) services.get(GenericEventManagerService.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
public TimerEventManager getTimerEventManager() {
|
public ClockEventManagerService getClockEventManagerService() {
|
||||||
return timerEventManager;
|
return (ClockEventManagerService) services.get(ClockEventManagerService.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
public FileFollowEventManager getFileFollowEventManager() {
|
public TimerEventManagerService getTimerEventManagerService() {
|
||||||
return fileFollowEventManager;
|
return (TimerEventManagerService) services.get(TimerEventManagerService.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
public PathEventManager getPathEventManager() {
|
public FileFollowEventManagerService getFileFollowEventManagerService() {
|
||||||
return pathEventManager;
|
return (FileFollowEventManagerService) services.get(FileFollowEventManagerService.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
public SyslogEventManager getSyslogEventManager() {
|
public PathEventManagerService getPathEventManagerService() {
|
||||||
return syslogEventManager;
|
return (PathEventManagerService) services.get(PathEventManagerService.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
|
@ -87,11 +86,11 @@ public final class EventManager {
|
||||||
closeable.close();
|
closeable.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
clockEventManager.close();
|
for (EventManagerService service : services.values()) {
|
||||||
timerEventManager.close();
|
if (service instanceof Closeable closeable) {
|
||||||
fileFollowEventManager.close();
|
closeable.close();
|
||||||
pathEventManager.close();
|
}
|
||||||
syslogEventManager.close();
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class Builder {
|
public static class Builder {
|
||||||
|
|
|
@ -0,0 +1,11 @@
|
||||||
|
package org.xbib.event.common;
|
||||||
|
|
||||||
|
import org.xbib.event.bus.EventBus;
|
||||||
|
import org.xbib.settings.Settings;
|
||||||
|
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
|
||||||
|
public interface EventManagerService {
|
||||||
|
|
||||||
|
EventManagerService init(Settings settings, EventBus eventBus, ClassLoader classLoader, ExecutorService executorService);
|
||||||
|
}
|
|
@ -2,17 +2,26 @@ package org.xbib.event.generic;
|
||||||
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
|
||||||
import org.xbib.event.bus.AsyncEventBus;
|
import org.xbib.event.bus.AsyncEventBus;
|
||||||
|
import org.xbib.event.bus.EventBus;
|
||||||
import org.xbib.event.bus.Subscriber;
|
import org.xbib.event.bus.Subscriber;
|
||||||
import org.xbib.event.bus.SubscriberRegistry;
|
import org.xbib.event.bus.SubscriberRegistry;
|
||||||
|
import org.xbib.event.common.EventManagerService;
|
||||||
|
import org.xbib.settings.Settings;
|
||||||
|
|
||||||
public class GenericEventManager {
|
public class GenericEventManagerService implements EventManagerService {
|
||||||
|
|
||||||
private final AsyncEventBus eventBus;
|
private EventBus eventBus;
|
||||||
|
|
||||||
public GenericEventManager(AsyncEventBus eventBus) {
|
public GenericEventManagerService() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GenericEventManagerService init(Settings settings, EventBus eventBus, ClassLoader classLoader, ExecutorService executorService) {
|
||||||
this.eventBus = eventBus;
|
this.eventBus = eventBus;
|
||||||
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void post(Object event) {
|
public void post(Object event) {
|
|
@ -1,6 +1,7 @@
|
||||||
package org.xbib.event.path;
|
package org.xbib.event.path;
|
||||||
|
|
||||||
import org.xbib.event.bus.AsyncEventBus;
|
import org.xbib.event.bus.EventBus;
|
||||||
|
import org.xbib.event.common.EventManagerService;
|
||||||
import org.xbib.settings.Settings;
|
import org.xbib.settings.Settings;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
|
@ -16,17 +17,21 @@ import java.util.logging.Level;
|
||||||
import java.util.logging.Logger;
|
import java.util.logging.Logger;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
public class FileFollowEventManager implements Closeable {
|
public class FileFollowEventManagerService implements EventManagerService, Closeable {
|
||||||
|
|
||||||
private static final Logger logger = Logger.getLogger(FileFollowEventManager.class.getName());
|
private static final Logger logger = Logger.getLogger(FileFollowEventManagerService.class.getName());
|
||||||
|
|
||||||
private final Map<Future<?>, FileFollowEventService> eventServiceMap;
|
private Map<Future<?>, FileFollowEventService> eventServiceMap;
|
||||||
|
|
||||||
|
public FileFollowEventManagerService() {
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public FileFollowEventManager(Settings settings,
|
@Override
|
||||||
AsyncEventBus eventBus,
|
public FileFollowEventManagerService init(Settings settings,
|
||||||
ExecutorService executorService,
|
EventBus eventBus,
|
||||||
ClassLoader classLoader) {
|
ClassLoader classLoader,
|
||||||
|
ExecutorService executorService) {
|
||||||
this.eventServiceMap = new LinkedHashMap<>();
|
this.eventServiceMap = new LinkedHashMap<>();
|
||||||
for (Map.Entry<String, Settings> entry : settings.getGroups("event.filefollow").entrySet()) {
|
for (Map.Entry<String, Settings> entry : settings.getGroups("event.filefollow").entrySet()) {
|
||||||
Settings definition = entry.getValue();
|
Settings definition = entry.getValue();
|
||||||
|
@ -49,6 +54,7 @@ public class FileFollowEventManager implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
|
@ -3,6 +3,7 @@ package org.xbib.event.path;
|
||||||
import org.xbib.datastructures.api.TimeValue;
|
import org.xbib.datastructures.api.TimeValue;
|
||||||
import org.xbib.datastructures.json.tiny.Json;
|
import org.xbib.datastructures.json.tiny.Json;
|
||||||
import org.xbib.event.bus.EventBus;
|
import org.xbib.event.bus.EventBus;
|
||||||
|
import org.xbib.event.common.EventManagerService;
|
||||||
import org.xbib.settings.Settings;
|
import org.xbib.settings.Settings;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
|
@ -22,25 +23,29 @@ import java.util.logging.Level;
|
||||||
import java.util.logging.Logger;
|
import java.util.logging.Logger;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
public class PathEventManager implements Closeable {
|
public class PathEventManagerService implements EventManagerService, Closeable {
|
||||||
|
|
||||||
private static final Logger logger = Logger.getLogger(PathEventManager.class.getName());
|
private static final Logger logger = Logger.getLogger(PathEventManagerService.class.getName());
|
||||||
|
|
||||||
private final EventBus eventBus;
|
private EventBus eventBus;
|
||||||
|
|
||||||
private final ExecutorService executorService;
|
private ExecutorService executorService;
|
||||||
|
|
||||||
private final Path path;
|
private Path path;
|
||||||
|
|
||||||
private final Map<Future<?>, PathEventService> eventServiceMap;
|
private Map<Future<?>, PathEventService> eventServiceMap;
|
||||||
|
|
||||||
private final List<String> suspendedQueues;
|
private List<String> suspendedQueues;
|
||||||
|
|
||||||
|
public PathEventManagerService() {
|
||||||
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public PathEventManager(Settings settings,
|
@Override
|
||||||
|
public PathEventManagerService init(Settings settings,
|
||||||
EventBus eventBus,
|
EventBus eventBus,
|
||||||
ExecutorService executorService,
|
ClassLoader classLoader,
|
||||||
ClassLoader classLoader) {
|
ExecutorService executorService) {
|
||||||
this.eventBus = eventBus;
|
this.eventBus = eventBus;
|
||||||
this.executorService = executorService;
|
this.executorService = executorService;
|
||||||
this.eventServiceMap = new LinkedHashMap<>();
|
this.eventServiceMap = new LinkedHashMap<>();
|
||||||
|
@ -64,6 +69,7 @@ public class PathEventManager implements Closeable {
|
||||||
logger.log(Level.SEVERE, "unable to create event path service " + entry.getKey() + ", reason " + e.getMessage(), e);
|
logger.log(Level.SEVERE, "unable to create event path service " + entry.getKey() + ", reason " + e.getMessage(), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void createPathEventService(String name,
|
public void createPathEventService(String name,
|
|
@ -32,7 +32,7 @@ public class PathEventService implements Callable<Integer>, Closeable {
|
||||||
|
|
||||||
public static final String FAIL = "fail";
|
public static final String FAIL = "fail";
|
||||||
|
|
||||||
private final PathEventManager pathEventManager;
|
private final PathEventManagerService pathEventManager;
|
||||||
|
|
||||||
private final EventBus eventBus;
|
private final EventBus eventBus;
|
||||||
|
|
||||||
|
@ -52,7 +52,7 @@ public class PathEventService implements Callable<Integer>, Closeable {
|
||||||
|
|
||||||
private volatile boolean keepWatching;
|
private volatile boolean keepWatching;
|
||||||
|
|
||||||
public PathEventService(PathEventManager pathEventManager,
|
public PathEventService(PathEventManagerService pathEventManager,
|
||||||
EventBus eventBus,
|
EventBus eventBus,
|
||||||
String name,
|
String name,
|
||||||
Path path,
|
Path path,
|
||||||
|
|
|
@ -2,6 +2,7 @@ package org.xbib.event.timer;
|
||||||
|
|
||||||
import org.xbib.event.bus.AsyncEventBus;
|
import org.xbib.event.bus.AsyncEventBus;
|
||||||
import org.xbib.event.bus.EventBus;
|
import org.xbib.event.bus.EventBus;
|
||||||
|
import org.xbib.event.common.EventManagerService;
|
||||||
import org.xbib.event.persistence.FilePersistenceStore;
|
import org.xbib.event.persistence.FilePersistenceStore;
|
||||||
import org.xbib.event.persistence.PersistenceStore;
|
import org.xbib.event.persistence.PersistenceStore;
|
||||||
import org.xbib.settings.Settings;
|
import org.xbib.settings.Settings;
|
||||||
|
@ -16,26 +17,27 @@ import java.time.ZoneId;
|
||||||
import java.time.ZonedDateTime;
|
import java.time.ZonedDateTime;
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.logging.Level;
|
import java.util.logging.Level;
|
||||||
import java.util.logging.Logger;
|
import java.util.logging.Logger;
|
||||||
|
|
||||||
public class TimerEventManager implements Closeable {
|
public class TimerEventManagerService implements EventManagerService, Closeable {
|
||||||
|
|
||||||
private static final Logger logger = Logger.getLogger(TimerEventManager.class.getName());
|
private static final Logger logger = Logger.getLogger(TimerEventManagerService.class.getName());
|
||||||
|
|
||||||
private final Map<String, TimerEventService> services;
|
private Map<String, TimerEventService> services;
|
||||||
|
|
||||||
public TimerEventManager(Settings settings) {
|
public TimerEventManagerService() {
|
||||||
this(settings, new AsyncEventBus(Executors.newSingleThreadExecutor()),
|
|
||||||
TimerEventManager.class.getClassLoader(), ZoneId.systemDefault());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public TimerEventManager(Settings settings,
|
@Override
|
||||||
|
public TimerEventManagerService init(Settings settings,
|
||||||
EventBus eventBus,
|
EventBus eventBus,
|
||||||
ClassLoader classLoader,
|
ClassLoader classLoader,
|
||||||
ZoneId zoneId) {
|
ExecutorService executorService) {
|
||||||
this.services = new LinkedHashMap<>();
|
this.services = new LinkedHashMap<>();
|
||||||
for (Map.Entry<String, Settings> entry : settings.getGroups("event.timer").entrySet()) {
|
for (Map.Entry<String, Settings> entry : settings.getGroups("event.timer").entrySet()) {
|
||||||
String name = entry.getKey();
|
String name = entry.getKey();
|
||||||
|
@ -44,38 +46,43 @@ public class TimerEventManager implements Closeable {
|
||||||
try {
|
try {
|
||||||
Class<? extends TimerEvent> eventClass = (Class<? extends TimerEvent>) classLoader.loadClass(className);
|
Class<? extends TimerEvent> eventClass = (Class<? extends TimerEvent>) classLoader.loadClass(className);
|
||||||
PersistenceStore<String, Object> persistenceStore = new FilePersistenceStore(timerSettings, name);
|
PersistenceStore<String, Object> persistenceStore = new FilePersistenceStore(timerSettings, name);
|
||||||
services.put(name, new TimerEventService(eventBus, name, eventClass, zoneId, persistenceStore));
|
services.put(name, new TimerEventService(eventBus, name, eventClass, ZoneId.systemDefault(), persistenceStore));
|
||||||
logger.log(Level.INFO, "timer " + name + " active for timer event class " + className);
|
logger.log(Level.INFO, "timer " + name + " active for timer event class " + className);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.log(Level.WARNING, "unable to activate timer " + name + ", reason: " + e.getMessage(), e);
|
logger.log(Level.WARNING, "unable to activate timer " + name + ", reason: " + e.getMessage(), e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean put(String key, String timeSpec, Map<String,Object> map) throws ParseException, IOException {
|
public boolean put(String service,
|
||||||
if (services.containsKey(key)) {
|
String timeSpec,
|
||||||
|
Map<String, Object> map) throws ParseException, IOException {
|
||||||
|
if (services.containsKey(service)) {
|
||||||
Span span = Chronic.parse(timeSpec);
|
Span span = Chronic.parse(timeSpec);
|
||||||
if (span != null) {
|
if (span != null) {
|
||||||
ZonedDateTime zonedDateTime = span.getBeginCalendar();
|
ZonedDateTime zonedDateTime = span.getBeginCalendar();
|
||||||
services.get(key).schedule(zonedDateTime.toInstant(), map);
|
services.get(service).schedule(zonedDateTime.toInstant(), map);
|
||||||
logger.log(Level.INFO, "scheduled to " + zonedDateTime);
|
logger.log(Level.INFO, "scheduled to " + zonedDateTime);
|
||||||
} else {
|
} else {
|
||||||
logger.log(Level.INFO, "timer event key " + key + ": can not understand time spec " + timeSpec);
|
logger.log(Level.INFO, "timer event key " + service + ": can not understand time spec " + timeSpec);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
logger.log(Level.SEVERE, "unknown timer event key: " + key);
|
logger.log(Level.SEVERE, "unknown timer event key: " + service);
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean put(String key, Instant instant, Map<String,Object> map) throws IOException {
|
public boolean put(String service,
|
||||||
if (services.containsKey(key)) {
|
Instant instant,
|
||||||
services.get(key).schedule(instant, map);
|
Map<String, Object> map) throws IOException {
|
||||||
|
if (services.containsKey(service)) {
|
||||||
|
services.get(service).schedule(instant, map);
|
||||||
return true;
|
return true;
|
||||||
} else {
|
} else {
|
||||||
logger.log(Level.SEVERE, "unknown timer event key: " + key);
|
logger.log(Level.SEVERE, "unknown timer event key: " + service);
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
|
@ -24,7 +24,7 @@ public class EventManagerTest {
|
||||||
EventManager eventManager = EventManager.builder(settings)
|
EventManager eventManager = EventManager.builder(settings)
|
||||||
.register(consumer)
|
.register(consumer)
|
||||||
.build();
|
.build();
|
||||||
eventManager.getGenericEventManager().post(new DefaultGenericEvent(e -> {
|
eventManager.getGenericEventManagerService().post(new DefaultGenericEvent(e -> {
|
||||||
logger.log(Level.INFO, "received event " + e);
|
logger.log(Level.INFO, "received event " + e);
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
@ -39,7 +39,7 @@ public class EventManagerTest {
|
||||||
.register(consumer)
|
.register(consumer)
|
||||||
.build();
|
.build();
|
||||||
CompletableFuture<GenericEvent> future = new CompletableFuture<>();
|
CompletableFuture<GenericEvent> future = new CompletableFuture<>();
|
||||||
eventManager.getGenericEventManager().post(new DefaultGenericEvent(e -> {
|
eventManager.getGenericEventManagerService().post(new DefaultGenericEvent(e -> {
|
||||||
logger.log(Level.INFO, "received event " + e);
|
logger.log(Level.INFO, "received event " + e);
|
||||||
future.complete(e);
|
future.complete(e);
|
||||||
}));
|
}));
|
||||||
|
@ -59,7 +59,7 @@ public class EventManagerTest {
|
||||||
.loadEventConsumers()
|
.loadEventConsumers()
|
||||||
.build();
|
.build();
|
||||||
CompletableFuture<GenericEvent> future = new CompletableFuture<>();
|
CompletableFuture<GenericEvent> future = new CompletableFuture<>();
|
||||||
eventManager.getGenericEventManager().post(new DefaultGenericEvent(e -> {
|
eventManager.getGenericEventManagerService().post(new DefaultGenericEvent(e -> {
|
||||||
logger.log(Level.INFO, "received event " + e);
|
logger.log(Level.INFO, "received event " + e);
|
||||||
}), future);
|
}), future);
|
||||||
GenericEvent e = future.get();
|
GenericEvent e = future.get();
|
||||||
|
|
|
@ -19,8 +19,7 @@ public class ClockEventManagerTest {
|
||||||
EventManager eventManager = EventManager.builder(settings)
|
EventManager eventManager = EventManager.builder(settings)
|
||||||
.register(clockEventConsumer)
|
.register(clockEventConsumer)
|
||||||
.build();
|
.build();
|
||||||
ClockEventManager clockEventManager = eventManager.getClockEventManager();
|
|
||||||
Thread.sleep(90000L);
|
Thread.sleep(90000L);
|
||||||
clockEventManager.close();
|
eventManager.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,15 +28,14 @@ public class FileFollowEventManagerTest {
|
||||||
EventManager eventManager = EventManager.builder(settings)
|
EventManager eventManager = EventManager.builder(settings)
|
||||||
.register(consumer)
|
.register(consumer)
|
||||||
.build();
|
.build();
|
||||||
FileFollowEventManager fileFolloeEventManager = eventManager.getFileFollowEventManager();
|
|
||||||
Thread.sleep(5000L);
|
Thread.sleep(5000L);
|
||||||
try (BufferedWriter bufferedWriter = Files.newBufferedWriter(path.resolve("test.txt"))) {
|
try (BufferedWriter bufferedWriter = Files.newBufferedWriter(path.resolve("test.txt"))) {
|
||||||
bufferedWriter.write("Hello");
|
bufferedWriter.write("Hello");
|
||||||
logger.log(Level.INFO, "Hello written");
|
logger.log(Level.INFO, "Hello written");
|
||||||
}
|
}
|
||||||
Thread.sleep(5000L);
|
Thread.sleep(5000L);
|
||||||
fileFolloeEventManager.close();
|
|
||||||
Files.delete(path.resolve("test.txt"));
|
Files.delete(path.resolve("test.txt"));
|
||||||
Files.delete(path);
|
Files.delete(path);
|
||||||
|
eventManager.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,7 @@ public class TimerEventManagerTest {
|
||||||
EventManager eventManager = EventManager.builder(settings)
|
EventManager eventManager = EventManager.builder(settings)
|
||||||
.register(consumer)
|
.register(consumer)
|
||||||
.build();
|
.build();
|
||||||
TimerEventManager timerEventManager = eventManager.getTimerEventManager();
|
TimerEventManagerService timerEventManager = eventManager.getTimerEventManagerService();
|
||||||
timerEventManager.put("testtimerevent", Instant.now().plusSeconds(5L), Map.of("a", "b"));
|
timerEventManager.put("testtimerevent", Instant.now().plusSeconds(5L), Map.of("a", "b"));
|
||||||
Thread.sleep(10000L);
|
Thread.sleep(10000L);
|
||||||
timerEventManager.close();
|
timerEventManager.close();
|
||||||
|
|
3
event-net-http/build.gradle
Normal file
3
event-net-http/build.gradle
Normal file
|
@ -0,0 +1,3 @@
|
||||||
|
dependencies {
|
||||||
|
|
||||||
|
}
|
5
event-syslog/build.gradle
Normal file
5
event-syslog/build.gradle
Normal file
|
@ -0,0 +1,5 @@
|
||||||
|
dependencies {
|
||||||
|
api project(':event-common')
|
||||||
|
implementation libs.datastructures.json.tiny
|
||||||
|
implementation libs.netty.handler
|
||||||
|
}
|
17
event-syslog/src/main/java/module-info.java
Normal file
17
event-syslog/src/main/java/module-info.java
Normal file
|
@ -0,0 +1,17 @@
|
||||||
|
import org.xbib.event.common.EventManagerService;
|
||||||
|
import org.xbib.event.syslog.SyslogEventManagerService;
|
||||||
|
|
||||||
|
module org.xbib.event.syslog {
|
||||||
|
exports org.xbib.event.syslog;
|
||||||
|
provides EventManagerService with SyslogEventManagerService;
|
||||||
|
requires org.xbib.event.common;
|
||||||
|
requires org.xbib.datastructures.json.tiny;
|
||||||
|
requires io.netty.buffer;
|
||||||
|
requires io.netty.common;
|
||||||
|
requires io.netty.transport;
|
||||||
|
requires io.netty.handler;
|
||||||
|
requires io.netty.codec;
|
||||||
|
requires java.logging;
|
||||||
|
requires org.xbib.settings.api;
|
||||||
|
requires org.xbib.event.api;
|
||||||
|
}
|
|
@ -1,6 +1,7 @@
|
||||||
package org.xbib.event.syslog;
|
package org.xbib.event.syslog;
|
||||||
|
|
||||||
import org.xbib.event.bus.AsyncEventBus;
|
import org.xbib.event.bus.EventBus;
|
||||||
|
import org.xbib.event.common.EventManagerService;
|
||||||
import org.xbib.settings.Settings;
|
import org.xbib.settings.Settings;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
|
@ -8,16 +9,22 @@ import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.logging.Level;
|
import java.util.logging.Level;
|
||||||
import java.util.logging.Logger;
|
import java.util.logging.Logger;
|
||||||
|
|
||||||
public class SyslogEventManager implements Closeable {
|
public class SyslogEventManagerService implements EventManagerService, Closeable {
|
||||||
private static final Logger logger = Logger.getLogger(SyslogEventManager.class.getName());
|
private static final Logger logger = Logger.getLogger(SyslogEventManagerService.class.getName());
|
||||||
|
|
||||||
private final List<SyslogService> syslogServices;
|
private List<SyslogService> syslogServices;
|
||||||
|
|
||||||
public SyslogEventManager(Settings settings,
|
public SyslogEventManagerService() {}
|
||||||
AsyncEventBus eventBus) {
|
|
||||||
|
@Override
|
||||||
|
public SyslogEventManagerService init(Settings settings,
|
||||||
|
EventBus eventBus,
|
||||||
|
ClassLoader classLoader,
|
||||||
|
ExecutorService executorService) {
|
||||||
this.syslogServices = new ArrayList<>();
|
this.syslogServices = new ArrayList<>();
|
||||||
for (Map.Entry<String, Settings> entry : settings.getGroups("event.syslog").entrySet()) {
|
for (Map.Entry<String, Settings> entry : settings.getGroups("event.syslog").entrySet()) {
|
||||||
Settings definition = entry.getValue();
|
Settings definition = entry.getValue();
|
||||||
|
@ -31,6 +38,7 @@ public class SyslogEventManager implements Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
|
@ -0,0 +1 @@
|
||||||
|
org.xbib.event.syslog.SyslogEventManagerService
|
|
@ -1,6 +1,7 @@
|
||||||
package org.xbib.event.syslog;
|
package org.xbib.event.syslog.test;
|
||||||
|
|
||||||
import org.junit.jupiter.api.Test;
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.xbib.event.syslog.SyslogService;
|
||||||
import org.xbib.settings.Settings;
|
import org.xbib.settings.Settings;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -14,7 +15,7 @@ public class SyslogServiceTest {
|
||||||
.build();
|
.build();
|
||||||
SyslogService syslogService = new SyslogService(settings, null);
|
SyslogService syslogService = new SyslogService(settings, null);
|
||||||
syslogService.startTcp();
|
syslogService.startTcp();
|
||||||
Thread.sleep(60000L);
|
Thread.sleep(60000L); // send a syslog from console
|
||||||
syslogService.close();
|
syslogService.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -1,3 +1,3 @@
|
||||||
group = org.xbib
|
group = org.xbib
|
||||||
name = event
|
name = event
|
||||||
version = 0.1.0
|
version = 0.2.0
|
||||||
|
|
|
@ -45,3 +45,4 @@ include 'event-async'
|
||||||
include 'event-common'
|
include 'event-common'
|
||||||
include 'event-loop'
|
include 'event-loop'
|
||||||
include 'event-net-http'
|
include 'event-net-http'
|
||||||
|
include 'event-syslog'
|
||||||
|
|
Loading…
Reference in a new issue