working on clock/timer/file event tests

This commit is contained in:
Jörg Prante 2023-10-06 16:25:49 +02:00
parent 09dbbe9f67
commit 7bb652dba4
16 changed files with 369 additions and 49 deletions

View file

@ -4,6 +4,7 @@ import org.xbib.event.bus.AsyncEventBus;
import org.xbib.event.bus.SubscriberExceptionContext;
import org.xbib.event.bus.SubscriberExceptionHandler;
import org.xbib.event.clock.ClockEventManager;
import org.xbib.event.io.file.FileFollowEventManager;
import org.xbib.event.timer.TimerEventManager;
import org.xbib.settings.Settings;
@ -37,9 +38,12 @@ public final class EventManager {
private final TimerEventManager timerEventManager;
private final FileFollowEventManager fileFollowEventManager;
private EventManager(Settings settings) {
this.clockEventManager = new ClockEventManager(settings, eventBus, classLoader);
this.timerEventManager = new TimerEventManager(settings, eventBus, classLoader, ZoneId.systemDefault());
this.fileFollowEventManager = new FileFollowEventManager(settings, eventBus, executorService, classLoader);
}
public static EventManager newEventManager(Settings settings) {
@ -52,6 +56,10 @@ public final class EventManager {
eventBus.register(eventConsumer);
}
public List<EventConsumer> getEventConsumers() {
return eventConsumers;
}
public ClockEventManager getClockEventManager() {
return clockEventManager;
}
@ -60,8 +68,8 @@ public final class EventManager {
return timerEventManager;
}
public List<EventConsumer> getEventConsumers() {
return eventConsumers;
public FileFollowEventManager getFileFollowEventManager() {
return fileFollowEventManager;
}
public void close() throws IOException {

View file

@ -6,9 +6,9 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@SuppressWarnings("serial")
public class AbstractPersistenceStore extends LinkedHashMap<String, Object>
implements PersistenceStore<String, Object> {
public class AbstractPersistenceStore implements PersistenceStore<String, Object> {
private final Map<String, Object> internalMap;
private final Durability durability;
@ -20,6 +20,7 @@ public class AbstractPersistenceStore extends LinkedHashMap<String, Object>
Durability durability,
String storeName) {
super();
this.internalMap = new LinkedHashMap<>();
this.codec = codec;
this.durability = durability;
this.storeName = storeName;
@ -37,10 +38,10 @@ public class AbstractPersistenceStore extends LinkedHashMap<String, Object>
@Override
public void load() throws IOException {
clear();
internalMap.clear();
Map<String, Object> map = codec.read(storeName);
if (map != null) {
putAll(map);
internalMap.putAll(map);
}
}
@ -51,7 +52,7 @@ public class AbstractPersistenceStore extends LinkedHashMap<String, Object>
@Override
public void commit() throws IOException {
codec.write(storeName, this);
codec.write(storeName, internalMap);
}
@Override
@ -62,19 +63,36 @@ public class AbstractPersistenceStore extends LinkedHashMap<String, Object>
@SuppressWarnings("unchecked")
@Override
public void insert(String key, Object value) throws IOException {
putIfAbsent(key, new ArrayList<>());
List<Object> list = (List<Object>) get(key);
internalMap.putIfAbsent(key, new ArrayList<>());
List<Object> list = (List<Object>) internalMap.get(key);
list.add(value);
put(key, list);
internalMap.put(key, list);
commit();
}
@SuppressWarnings("unchecked")
void remove(String key, Object value) throws IOException {
putIfAbsent(key, new ArrayList<>());
List<Object> list = (List<Object>) get(key);
list.remove(value);
put(key, list);
@Override
public boolean remove(String key, Object value) throws IOException {
internalMap.putIfAbsent(key, new ArrayList<>());
List<Object> list = (List<Object>) internalMap.get(key);
boolean b = list.remove(value);
internalMap.put(key, list);
commit();
return b;
}
@Override
public Object get(String key) {
return internalMap.get(key);
}
@Override
public Object getOrDefault(String key, Object defaultValue) {
return internalMap.getOrDefault(key, defaultValue);
}
@Override
public void clear() {
internalMap.clear();
}
}

View file

@ -0,0 +1,172 @@
package org.xbib.event.persistence;
import org.xbib.net.util.ExceptionFormatter;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileVisitOption;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.PathMatcher;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.BasicFileAttributes;
import java.time.Instant;
import java.util.EnumSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Journal {
private static final Logger logger = Logger.getLogger(Journal.class.getName());
private final Path journalPath;
private final ReentrantReadWriteLock lock;
public Journal(String journalPathName) throws IOException {
this.journalPath = createJournal(journalPathName);
this.lock = new ReentrantReadWriteLock();
}
private static Path createJournal(String logPathName) throws IOException {
Path logPath = Paths.get(logPathName);
Files.createDirectories(logPath);
if (!Files.exists(logPath) || !Files.isWritable(logPath)) {
throw new IOException("unable to write to log path = " + logPath);
}
return logPath;
}
public void logRequest(String stamp, String request) throws IOException {
logger.log(Level.FINE, stamp + " request = " + request);
ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
writeLock.lock();
try (OutputStream outputStream = Files.newOutputStream(journalPath.resolve(stamp + ".log"), StandardOpenOption.CREATE)) {
outputStream.write(request.getBytes(StandardCharsets.UTF_8));
} finally {
writeLock.unlock();
}
}
public void logResponse(String stamp, String response) throws IOException {
logger.log(Level.FINE, stamp + " response = " + response);
ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
writeLock.lock();
Path path = journalPath.resolve("success").resolve(stamp + ".request");
Files.createDirectories(path.getParent());
Files.move(journalPath.resolve(stamp + ".log"), path);
try (OutputStream outputStream = Files.newOutputStream(journalPath.resolve("success").resolve(stamp + ".response"), StandardOpenOption.CREATE)) {
outputStream.write(response.getBytes(StandardCharsets.UTF_8));
} finally {
writeLock.unlock();
}
}
public void logFail(String stamp, Throwable t) throws IOException {
ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
writeLock.lock();
Path path = journalPath.resolve("fail").resolve(stamp + ".request");
Files.createDirectories(path.getParent());
Files.move(journalPath.resolve(stamp + ".log"), path);
// save throwable in extra file
try (OutputStream outputStream = Files.newOutputStream(journalPath.resolve("fail").resolve(stamp + ".exception"), StandardOpenOption.CREATE)) {
outputStream.write(ExceptionFormatter.format(t).getBytes(StandardCharsets.UTF_8));
} finally {
writeLock.unlock();
}
}
public void retry(Consumer<StampedEntry> consumer) throws IOException {
ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
writeLock.lock();
PathMatcher pathMatcher = journalPath.getFileSystem().getPathMatcher("glob:*.log");
try {
Files.walkFileTree(journalPath, EnumSet.of(FileVisitOption.FOLLOW_LINKS), Integer.MAX_VALUE, new SimpleFileVisitor<>() {
@Override
public FileVisitResult visitFile(Path p, BasicFileAttributes a) throws IOException {
if ((Files.isRegularFile(p) && pathMatcher.matches(p.getFileName()))) {
String stamp = p.getFileName().toString();
String entry = Files.readString(p);
consumer.accept(new StampedEntry(stamp, entry));
Files.delete(p);
}
return FileVisitResult.CONTINUE;
}
});
} finally {
writeLock.unlock();
}
}
public void purgeSuccess(Instant instant) throws IOException {
ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
writeLock.lock();
PathMatcher pathMatcher = journalPath.getFileSystem().getPathMatcher("glob:*.request");
try {
if (Files.exists(journalPath.resolve("success"))) {
Files.walkFileTree(journalPath.resolve("success"), EnumSet.of(FileVisitOption.FOLLOW_LINKS), Integer.MAX_VALUE, new SimpleFileVisitor<>() {
@Override
public FileVisitResult visitFile(Path p, BasicFileAttributes a) throws IOException {
if ((Files.isRegularFile(p) && pathMatcher.matches(p.getFileName()))) {
if (Files.getLastModifiedTime(p).toInstant().isBefore(instant)) {
Files.delete(p);
}
}
return FileVisitResult.CONTINUE;
}
});
}
} finally {
writeLock.unlock();
}
}
public void purgeFail(Instant instant) throws IOException {
ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
writeLock.lock();
PathMatcher pathMatcher = journalPath.getFileSystem().getPathMatcher("glob:*.request");
try {
if (Files.exists(journalPath.resolve("fail"))) {
Files.walkFileTree(journalPath.resolve("fail"), EnumSet.of(FileVisitOption.FOLLOW_LINKS), Integer.MAX_VALUE, new SimpleFileVisitor<>() {
@Override
public FileVisitResult visitFile(Path p, BasicFileAttributes a) throws IOException {
if ((Files.isRegularFile(p) && pathMatcher.matches(p.getFileName()))) {
if (Files.getLastModifiedTime(p).toInstant().isBefore(instant)) {
Files.delete(p);
}
}
return FileVisitResult.CONTINUE;
}
});
}
} finally {
writeLock.unlock();
}
}
public static class StampedEntry {
private final String stamp;
private final String entry;
public StampedEntry(String stamp, String entry) {
this.stamp = stamp;
this.entry = entry;
}
public String getStamp() {
return stamp;
}
public String getEntry() {
return entry;
}
}
}

View file

@ -3,7 +3,7 @@ package org.xbib.event.persistence;
import java.io.IOException;
import java.util.Map;
public interface PersistenceStore<K, V> extends Map<K, V> {
public interface PersistenceStore<K, V> {
Durability getDurability();
@ -11,6 +11,12 @@ public interface PersistenceStore<K, V> extends Map<K, V> {
void insert(K k, V v) throws IOException;
boolean remove(K k, V v) throws IOException;
Object get(String key);
Object getOrDefault(String key, Object defaultValue);
void load() throws IOException;
void begin();
@ -18,4 +24,6 @@ public interface PersistenceStore<K, V> extends Map<K, V> {
void commit() throws IOException;
void rollback();
void clear();
}

View file

@ -0,0 +1,24 @@
package org.xbib.event.timer;
import org.xbib.event.DefaultEvent;
import org.xbib.event.clock.ClockEvent;
import java.time.Instant;
public class DefaultTimerEvent extends DefaultEvent implements TimerEvent {
private Instant instant;
public DefaultTimerEvent() {
}
@Override
public void setInstant(Instant instant) {
this.instant = instant;
}
@Override
public Instant getInstant() {
return instant;
}
}

View file

@ -37,7 +37,7 @@ public class TimerEventManager implements Closeable {
ClassLoader classLoader,
ZoneId zoneId) {
this.services = new LinkedHashMap<>();
for (Map.Entry<String, Settings> entry : settings.getGroups("timer").entrySet()) {
for (Map.Entry<String, Settings> entry : settings.getGroups("event.timer").entrySet()) {
String name = entry.getKey();
Settings timerSettings = entry.getValue();
String className = timerSettings.get("class", TimerEvent.class.getName());
@ -47,7 +47,7 @@ public class TimerEventManager implements Closeable {
services.put(name, new TimerEventService(eventBus, name, eventClass, zoneId, persistenceStore));
logger.log(Level.INFO, "timer " + name + " active for timer event class " + className);
} catch (Exception e) {
logger.log(Level.WARNING, "unable to activate timer " + name + ", reason " + e.getMessage());
logger.log(Level.WARNING, "unable to activate timer " + name + ", reason: " + e.getMessage(), e);
}
}
}

View file

@ -12,6 +12,7 @@ import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@ -48,14 +49,13 @@ class TimerEventService implements Closeable {
logger.log(Level.INFO, "timer event service " + name + " loaded and rescheduled, " + tasknum + " timer tasks");
}
void schedule(Instant instant, Map<String, Object> task) throws IOException {
void schedule(Instant instant, Map<String, Object> map) throws IOException {
ZonedDateTime zonedDateTime = instant.atZone(zoneId);
Map<String, Object> task = new LinkedHashMap<>(map);
task.put("scheduled", zonedDateTime.format(DateTimeFormatter.ISO_DATE_TIME));
TimerEventTask timerEventTask = new TimerEventTask(task);
Date date = Date.from(instant);
timer.schedule(timerEventTask, date);
persistenceStore.insert("tasks", task);
logger.log(Level.INFO, "new task " + task + " added, scheduled at " + date);
timer.schedule(timerEventTask, Date.from(instant));
logger.log(Level.INFO, "new task " + map + " added, scheduled at " + instant);
}
@SuppressWarnings("unchecked")
@ -90,10 +90,11 @@ class TimerEventService implements Closeable {
public class TimerEventTask extends TimerTask {
private final Map<String,Object> map;
private final Map<String, Object> map;
public TimerEventTask(Map<String, Object> map) {
public TimerEventTask(Map<String, Object> map) throws IOException {
this.map = map;
persistenceStore.insert("tasks", this.map);
}
@Override
@ -101,16 +102,18 @@ class TimerEventService implements Closeable {
TimerEvent timerEvent;
try {
timerEvent = eventClass.getDeclaredConstructor().newInstance();
} catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
timerEvent.setInstant(Instant.now());
timerEvent.setMap(map);
logger.log(Level.FINE, "posting timer event " + timerEvent.getClass().getName() + " map = " + map);
eventBus.post(timerEvent);
logger.log(Level.FINE, "persistence before remove: " + persistenceStore.get("tasks"));
if (persistenceStore.remove("tasks", this.map)) {
logger.log(Level.FINE, "removal done");
}
logger.log(Level.FINE, "persistence after remove: " + persistenceStore.get("tasks"));
} catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException | IOException e) {
logger.log(Level.SEVERE, e.getMessage(), e);
return;
}
timerEvent.setInstant(Instant.now());
timerEvent.setMap(map);
logger.log(Level.FINE, "posting timer event " + timerEvent.getClass().getName() + " map = " + map);
eventBus.post(timerEvent);
persistenceStore.remove("tasks", this);
logger.log(Level.FINE, "persistence after remove: " + persistenceStore.get("tasks"));
}
@Override
@ -125,7 +128,7 @@ class TimerEventService implements Closeable {
@Override
public boolean equals(Object object) {
return object instanceof TimerEventTask && Objects.equals(map, object);
return object instanceof TimerEventTask && Objects.equals(map, ((TimerEventTask) object).map);
}
}
}

View file

@ -1,6 +1,7 @@
package org.xbib.event.clock;
import org.junit.jupiter.api.Test;
import org.xbib.event.EventManager;
import org.xbib.settings.Settings;
import java.io.IOException;
@ -10,14 +11,15 @@ public class ClockEventManagerTest {
@Test
public void testEvents() throws IOException, InterruptedException {
TestClockEventConsumer clockEventConsumer = new TestClockEventConsumer();
EventManager.register(clockEventConsumer);
Settings settings = Settings.settingsBuilder()
.put("event.consumer.testconsumer.enabled", "true")
.put("event.consumer.testconsumer.class", "org.xbib.event.clock.TestClockEventConsumer")
.put("event.clock.testclockevent.enabled", "true")
.put("event.clock.testclockevent.class", "org.xbib.event.clock.TestClockEvent")
.put("event.clock.testclockevent.entry", "*/1 6-21 * * *")
.build();
ClockEventManager clockEventManager = new ClockEventManager(settings);
EventManager eventManager = EventManager.newEventManager(settings);
ClockEventManager clockEventManager = eventManager.getClockEventManager();
Thread.sleep(90000L);
clockEventManager.close();
}

View file

@ -4,15 +4,17 @@ import org.xbib.event.EventConsumer;
import org.xbib.event.bus.AllowConcurrentEvents;
import org.xbib.event.bus.Subscribe;
import java.time.Instant;
import java.util.logging.Level;
import java.util.logging.Logger;
public class TestClockEventConsumer implements EventConsumer {
private static final Logger logger = Logger.getLogger(SimpleClockEventConsumer.class.getName());
private static final Logger logger = Logger.getLogger(TestClockEventConsumer.class.getName());
@Subscribe
@AllowConcurrentEvents
void onEvent(TestClockEvent event) {
logger.info("received test clock event, instant = " + event.getInstant());
logger.log(Level.INFO, "received test clock event on " + Instant.now() + " event instant = " + event.getInstant());
}
}

View file

@ -0,0 +1,32 @@
package org.xbib.event.io.file;
import org.junit.jupiter.api.Test;
import org.xbib.event.EventManager;
import org.xbib.event.timer.TestTimerEventConsumer;
import org.xbib.event.timer.TimerEventManager;
import org.xbib.settings.Settings;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
import java.util.Map;
public class FileFollowEventManagerTest {
@Test
public void testFileFollowEvents() throws IOException, InterruptedException {
Path path = Files.createTempDirectory("testfilefollow");
TestFileFollowEventConsumer consumer = new TestFileFollowEventConsumer();
EventManager.register(consumer);
Settings settings = Settings.settingsBuilder()
.put("event.timer.testfilefollowevent.enabled", "true")
.put("event.timer.testfilefollowevent.class", "org.xbib.event.io.file.TestFileFollowEvent")
.put("event.timer.testfilefollowevent.path", path.toString())
.build();
EventManager eventManager = EventManager.newEventManager(settings);
FileFollowEventManager fileFolloeEventManager = eventManager.getFileFollowEventManager()
Thread.sleep(10000L);
fileFolloeEventManager.close();
}
}

View file

@ -0,0 +1,4 @@
package org.xbib.event.io.file;
public class TestFileFollowEvent extends DefaultFileFollowEvent {
}

View file

@ -0,0 +1,21 @@
package org.xbib.event.io.file;
import org.xbib.event.EventConsumer;
import org.xbib.event.bus.AllowConcurrentEvents;
import org.xbib.event.bus.Subscribe;
import org.xbib.event.timer.TestTimerEvent;
import java.time.Instant;
import java.util.logging.Level;
import java.util.logging.Logger;
public class TestFileFollowEventConsumer implements EventConsumer {
private static final Logger logger = Logger.getLogger(TestFileFollowEventConsumer.class.getName());
@Subscribe
@AllowConcurrentEvents
void onEvent(TestFileFollowEvent event) {
logger.log(Level.INFO, "received filefollw event path = " + event.getPath());
}
}

View file

@ -0,0 +1,4 @@
package org.xbib.event.timer;
public class TestTimerEvent extends DefaultTimerEvent {
}

View file

@ -0,0 +1,20 @@
package org.xbib.event.timer;
import org.xbib.event.EventConsumer;
import org.xbib.event.bus.AllowConcurrentEvents;
import org.xbib.event.bus.Subscribe;
import java.time.Instant;
import java.util.logging.Level;
import java.util.logging.Logger;
public class TestTimerEventConsumer implements EventConsumer {
private static final Logger logger = Logger.getLogger(TestTimerEventConsumer.class.getName());
@Subscribe
@AllowConcurrentEvents
void onEvent(TestTimerEvent event) {
logger.log(Level.INFO, "received test timer event on " + Instant.now() + " event instant = " + event.getInstant());
}
}

View file

@ -1,4 +0,0 @@
package org.xbib.event.timer;
public class TimerEvent {
}

View file

@ -1,6 +1,7 @@
package org.xbib.event.timer;
import org.junit.jupiter.api.Test;
import org.xbib.event.EventManager;
import org.xbib.settings.Settings;
import java.io.IOException;
@ -10,12 +11,17 @@ import java.util.Map;
public class TimerEventManagerTest {
@Test
public void testEvents() throws IOException {
public void testTimerEvents() throws IOException, InterruptedException {
TestTimerEventConsumer consumer = new TestTimerEventConsumer();
EventManager.register(consumer);
Settings settings = Settings.settingsBuilder()
.put("event.consumer.testconsumer.type", "org.xbib.event.timer.TimerEventConsumer")
.put("event.consumer.testconsumer.enabled", "true")
.put("event.timer.testtimerevent.enabled", "true")
.put("event.timer.testtimerevent.class", "org.xbib.event.timer.TestTimerEvent")
.build();
TimerEventManager timerEventManager = new TimerEventManager(settings);
timerEventManager.put("key", Instant.now(), Map.of("a", "b"));
EventManager eventManager = EventManager.newEventManager(settings);
TimerEventManager timerEventManager = eventManager.getTimerEventManager();
timerEventManager.put("testtimerevent", Instant.now().plusSeconds(5L), Map.of("a", "b"));
Thread.sleep(10000L);
timerEventManager.close();
}
}