add clock event suspend/resume

This commit is contained in:
Jörg Prante 2024-01-05 17:45:26 +01:00
parent 48bbb2e455
commit d72fee9ce8
3 changed files with 45 additions and 12 deletions

View file

@ -1,3 +1,3 @@
group = org.xbib
name = event
version = 0.0.11
version = 0.0.12

View file

@ -8,6 +8,8 @@ import org.xbib.time.schedule.CronSchedule;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@ -21,6 +23,8 @@ public class ClockEventManager implements Closeable {
private final CronSchedule<Integer> cronSchedule;
private final List<String> suspended;
public ClockEventManager(Settings settings) {
this(settings, new AsyncEventBus(Executors.newSingleThreadExecutor()), ClockEventManager.class.getClassLoader());
}
@ -28,6 +32,7 @@ public class ClockEventManager implements Closeable {
public ClockEventManager(Settings settings,
EventBus eventBus,
ClassLoader classLoader) {
this.suspended = new ArrayList<>();
ThreadFactory threadFactory = new ThreadFactory() {
int n = 1;
@Override
@ -38,8 +43,9 @@ public class ClockEventManager implements Closeable {
ScheduledExecutorService executorService =
Executors.newScheduledThreadPool(settings.getAsInt("pool.size", 2), threadFactory);
this.cronSchedule = new CronSchedule<>(executorService);
for (Map.Entry<String, Settings> cronjobs : settings.getGroups("event.clock").entrySet()) {
Settings entrySettings = cronjobs.getValue();
for (Map.Entry<String, Settings> clockEventService : settings.getGroups("event.clock").entrySet()) {
String name = clockEventService.getKey();
Settings entrySettings = clockEventService.getValue();
if (entrySettings.getAsBoolean("enabled", true)) {
String entry = entrySettings.get("entry");
if (entry != null) {
@ -48,21 +54,35 @@ public class ClockEventManager implements Closeable {
if (className != null) {
@SuppressWarnings("unchecked")
Class<? extends ClockEvent> eventClass = (Class<? extends ClockEvent>) classLoader.loadClass(className);
cronSchedule.add(className, CronExpression.parse(entry), new ClockEventService(eventBus, eventClass));
logger.log(Level.INFO, "cron job " + cronjobs.getKey() + " scheduled on " + entry + ", event class " + className);
cronSchedule.add(className, CronExpression.parse(entry), new ClockEventService(this, eventBus, name, eventClass));
logger.log(Level.INFO, "cron job " + clockEventService.getKey() + " scheduled on " + entry + ", event class " + className);
} else {
logger.log(Level.WARNING, "no class specified");
}
} catch (Exception e) {
logger.log(Level.WARNING, "unable to schedule cron job " + cronjobs.getKey() + ", reason " + e.getMessage());
logger.log(Level.WARNING, "unable to schedule cron job " + clockEventService.getKey() + ", reason " + e.getMessage());
}
}
} else {
logger.log(Level.WARNING, "clock event service " + name + " in configuration not enabled");
}
}
logger.log(Level.INFO, "entries = " + cronSchedule.getEntries());
cronSchedule.start();
}
public List<String> getSuspended() {
return suspended;
}
public void suspend(String name) {
suspended.add(name);
}
public void resume(String name) {
suspended.remove(name);
}
@Override
public void close() throws IOException {
cronSchedule.close();

View file

@ -10,24 +10,37 @@ public class ClockEventService implements Callable<Integer> {
private static final Logger logger = Logger.getLogger(ClockEventService.class.getName());
private final ClockEventManager manager;
private final EventBus eventBus;
private final String name;
private final Class<? extends ClockEvent> eventClass;
public ClockEventService(EventBus eventBus,
public ClockEventService(ClockEventManager manager,
EventBus eventBus,
String name,
Class<? extends ClockEvent> eventClass) {
this.manager = manager;
this.eventBus = eventBus;
this.name = name;
this.eventClass = eventClass;
}
@Override
public Integer call() throws Exception {
try {
logger.log(Level.FINE, "posting clock event " + eventClass.getName());
ClockEvent clockEvent = eventClass.getDeclaredConstructor().newInstance();
clockEvent.setInstant(Instant.now());
eventBus.post(clockEvent);
return 0;
if (manager.getSuspended().contains(name)) {
logger.log(Level.FINE, "clock event " + name + " suspended");
return 1;
} else {
logger.log(Level.FINE, "posting clock event " + eventClass.getName());
ClockEvent clockEvent = eventClass.getDeclaredConstructor().newInstance();
clockEvent.setInstant(Instant.now());
eventBus.post(clockEvent);
return 0;
}
} catch (Throwable t) {
logger.log(Level.WARNING, t.getMessage(), t);
return 1;