add event manager

This commit is contained in:
Jörg Prante 2023-10-04 17:15:38 +02:00
parent 4daa443cb3
commit 6a67fe5db2
2 changed files with 52 additions and 23 deletions

View file

@ -0,0 +1,52 @@
package org.xbib.event;
import org.xbib.event.bus.AsyncEventBus;
import org.xbib.event.bus.EventBus;
import org.xbib.event.clock.ClockEventManager;
import org.xbib.settings.Settings;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
public class EventManager {
private static final Logger logger = Logger.getLogger(EventManager.class.getName());
public EventManager(Settings settings) {
this(settings, new AsyncEventBus(Executors.newSingleThreadExecutor()), ClockEventManager.class.getClassLoader());
}
public EventManager(Settings settings,
EventBus eventBus,
ClassLoader classLoader) {
// register consumers
List<String> consumerList = new ArrayList<>();
for (Map.Entry<String, Settings> consumers : settings.getGroups("event.consumer").entrySet()) {
Settings entrySettings = consumers.getValue();
if (entrySettings.getAsBoolean("enabled", true)) {
String className = entrySettings.get("class");
try {
if (className != null) {
@SuppressWarnings("unchecked")
Class<? extends EventConsumer> consumerClass = (Class<? extends EventConsumer>) classLoader.loadClass(className);
eventBus.register(consumerClass.getDeclaredConstructor().newInstance());
logger.log(Level.INFO, "consumer " + consumerClass + " registered");
consumerList.add(consumerClass.getName());
}
} catch (Exception e) {
logger.log(Level.WARNING, "unable to load consumer " + className + ", reason " + e.getMessage());
}
}
}
logger.log(Level.INFO, "consumers = " + consumerList);
}
public void start() {
}
}

View file

@ -1,6 +1,5 @@
package org.xbib.event.clock; package org.xbib.event.clock;
import org.xbib.event.EventConsumer;
import org.xbib.event.bus.AsyncEventBus; import org.xbib.event.bus.AsyncEventBus;
import org.xbib.event.bus.EventBus; import org.xbib.event.bus.EventBus;
import org.xbib.settings.Settings; import org.xbib.settings.Settings;
@ -9,8 +8,6 @@ import org.xbib.time.schedule.CronSchedule;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
@ -41,26 +38,6 @@ public class ClockEventManager implements Closeable {
ScheduledExecutorService executorService = ScheduledExecutorService executorService =
Executors.newScheduledThreadPool(settings.getAsInt("pool.size", 2), threadFactory); Executors.newScheduledThreadPool(settings.getAsInt("pool.size", 2), threadFactory);
this.cronSchedule = new CronSchedule<>(executorService); this.cronSchedule = new CronSchedule<>(executorService);
// register consumers
List<String> consumerList = new ArrayList<>();
for (Map.Entry<String, Settings> consumers : settings.getGroups("event.consumer").entrySet()) {
Settings entrySettings = consumers.getValue();
if (entrySettings.getAsBoolean("enabled", true)) {
String className = entrySettings.get("class");
try {
if (className != null) {
@SuppressWarnings("unchecked")
Class<? extends EventConsumer> consumerClass = (Class<? extends EventConsumer>) classLoader.loadClass(className);
eventBus.register(consumerClass.getDeclaredConstructor().newInstance());
logger.log(Level.INFO, "consumer " + consumerClass + " registered");
consumerList.add(consumerClass.getName());
}
} catch (Exception e) {
logger.log(Level.WARNING, "unable to load consumer " + className + ", reason " + e.getMessage());
}
}
}
logger.log(Level.INFO, "consumers = " + consumerList);
for (Map.Entry<String, Settings> cronjobs : settings.getGroups("event.clock").entrySet()) { for (Map.Entry<String, Settings> cronjobs : settings.getGroups("event.clock").entrySet()) {
Settings entrySettings = cronjobs.getValue(); Settings entrySettings = cronjobs.getValue();
if (entrySettings.getAsBoolean("enabled", true)) { if (entrySettings.getAsBoolean("enabled", true)) {