add event manager

This commit is contained in:
Jörg Prante 2023-10-05 18:17:41 +02:00
parent 6a67fe5db2
commit 09dbbe9f67
8 changed files with 363 additions and 61 deletions

View file

@ -1,15 +1,17 @@
module org.xbib.event {
exports org.xbib.event;
exports org.xbib.event.bus;
exports org.xbib.event.clock;
exports org.xbib.event.io;
exports org.xbib.event.io.file;
exports org.xbib.event.loop.selector;
exports org.xbib.event.loop;
exports org.xbib.event.persistence;
exports org.xbib.event.queue;
exports org.xbib.event.syslog;
exports org.xbib.event.yield;
exports org.xbib.event.io;
exports org.xbib.event.loop;
exports org.xbib.event.loop.selector;
exports org.xbib.event.timer;
exports org.xbib.event.util;
exports org.xbib.event.yield;
exports org.xbib.event;
requires org.xbib.datastructures.api;
requires org.xbib.datastructures.common;
requires org.xbib.datastructures.json.tiny;

View file

@ -0,0 +1,33 @@
package org.xbib.event;
import java.util.Map;
public class DefaultEvent implements Event {
private String key;
private Map<String, Object> map;
public DefaultEvent() {
}
@Override
public void setKey(String key) {
this.key = key;
}
@Override
public String getKey() {
return key;
}
@Override
public void setMap(Map<String, Object> map) {
this.map = map;
}
@Override
public Map<String, Object> getMap() {
return map;
}
}

View file

@ -1,52 +1,92 @@
package org.xbib.event;
import org.xbib.event.bus.AsyncEventBus;
import org.xbib.event.bus.EventBus;
import org.xbib.event.bus.SubscriberExceptionContext;
import org.xbib.event.bus.SubscriberExceptionHandler;
import org.xbib.event.clock.ClockEventManager;
import org.xbib.event.timer.TimerEventManager;
import org.xbib.settings.Settings;
import java.io.IOException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
public class EventManager {
public final class EventManager {
private static final Logger logger = Logger.getLogger(EventManager.class.getName());
public EventManager(Settings settings) {
this(settings, new AsyncEventBus(Executors.newSingleThreadExecutor()), ClockEventManager.class.getClassLoader());
private static ExecutorService executorService;
private static ClassLoader classLoader;
private static AsyncEventBus eventBus;
private static List<EventConsumer> eventConsumers;
static {
createInstances();
}
public EventManager(Settings settings,
EventBus eventBus,
ClassLoader classLoader) {
// register consumers
List<String> consumerList = new ArrayList<>();
for (Map.Entry<String, Settings> consumers : settings.getGroups("event.consumer").entrySet()) {
Settings entrySettings = consumers.getValue();
if (entrySettings.getAsBoolean("enabled", true)) {
String className = entrySettings.get("class");
try {
if (className != null) {
@SuppressWarnings("unchecked")
Class<? extends EventConsumer> consumerClass = (Class<? extends EventConsumer>) classLoader.loadClass(className);
eventBus.register(consumerClass.getDeclaredConstructor().newInstance());
logger.log(Level.INFO, "consumer " + consumerClass + " registered");
consumerList.add(consumerClass.getName());
}
} catch (Exception e) {
logger.log(Level.WARNING, "unable to load consumer " + className + ", reason " + e.getMessage());
}
}
}
logger.log(Level.INFO, "consumers = " + consumerList);
private final ClockEventManager clockEventManager;
private final TimerEventManager timerEventManager;
private EventManager(Settings settings) {
this.clockEventManager = new ClockEventManager(settings, eventBus, classLoader);
this.timerEventManager = new TimerEventManager(settings, eventBus, classLoader, ZoneId.systemDefault());
}
public void start() {
public static EventManager newEventManager(Settings settings) {
return new EventManager(settings);
}
public static void register(EventConsumer eventConsumer) {
Objects.requireNonNull(eventConsumer, "event consumer must not be null");
eventConsumers.add(eventConsumer);
eventBus.register(eventConsumer);
}
public ClockEventManager getClockEventManager() {
return clockEventManager;
}
public TimerEventManager getTimerEventManager() {
return timerEventManager;
}
public List<EventConsumer> getEventConsumers() {
return eventConsumers;
}
public void close() throws IOException {
clockEventManager.close();
}
private static class EventManagerExceptionHandler implements SubscriberExceptionHandler {
@Override
public void handleException(Throwable exception, SubscriberExceptionContext context) {
logger.log(Level.SEVERE, exception.getMessage(), exception);
}
}
private static void createInstances() {
if (executorService == null) {
executorService = Executors.newFixedThreadPool(2);
}
if (classLoader == null) {
classLoader = EventManager.class.getClassLoader();
}
if (eventBus == null) {
eventBus = new AsyncEventBus(executorService, new EventManagerExceptionHandler());
}
if (eventConsumers == null) {
eventConsumers = new ArrayList<>();
}
}
}

View file

@ -1,39 +1,16 @@
package org.xbib.event.clock;
import org.xbib.event.DefaultEvent;
import java.time.Instant;
import java.util.Map;
public class DefaultClockEvent implements ClockEvent {
private String key;
private Map<String, Object> map;
public class DefaultClockEvent extends DefaultEvent implements ClockEvent {
private Instant instant;
public DefaultClockEvent() {
}
@Override
public void setKey(String key) {
this.key = key;
}
@Override
public String getKey() {
return key;
}
@Override
public void setMap(Map<String, Object> map) {
this.map = map;
}
@Override
public Map<String, Object> getMap() {
return map;
}
@Override
public void setInstant(Instant instant) {
this.instant = instant;

View file

@ -0,0 +1,20 @@
package org.xbib.event.io.file;
import org.xbib.event.DefaultEvent;
import java.nio.file.Path;
public class DefaultFileFollowEvent extends DefaultEvent implements FileFollowEvent {
private Path path;
@Override
public void setPath(Path path) {
this.path = path;
}
@Override
public Path getPath() {
return path;
}
}

View file

@ -0,0 +1,12 @@
package org.xbib.event.io.file;
import org.xbib.event.Event;
import java.nio.file.Path;
public interface FileFollowEvent extends Event {
void setPath(Path path);
Path getPath();
}

View file

@ -0,0 +1,55 @@
package org.xbib.event.io.file;
import org.xbib.event.bus.AsyncEventBus;
import org.xbib.settings.Settings;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Pattern;
public class FileFollowEventManager {
private static final Logger logger = Logger.getLogger(FileFollowEventManager.class.getName());
private final Map<Future<?>, FileFollowEventService> eventServiceMap;
@SuppressWarnings("unchecked")
public FileFollowEventManager(Settings settings,
AsyncEventBus eventBus,
ExecutorService executorService,
ClassLoader classLoader) {
this.eventServiceMap = new LinkedHashMap<>();
for (Map.Entry<String, Settings> followfiles : settings.getGroups("filefollow").entrySet()) {
Settings definition = followfiles.getValue();
String baseStr = definition.get("base");
String patternStr = definition.get("pattern");
if (baseStr != null && patternStr != null) {
Path base = Paths.get(baseStr);
Pattern pattern = Pattern.compile(patternStr);
String className = definition.get("class", FileFollowEvent.class.getName());
try {
Class<? extends FileFollowEvent> eventClass = (Class<? extends FileFollowEvent>) classLoader.loadClass(className);
FileFollowEventService fileFollowEventService = new FileFollowEventService(definition, eventBus, base, pattern, eventClass);
Future<?> future = executorService.submit(fileFollowEventService);
eventServiceMap.put(future, fileFollowEventService);
logger.log(Level.INFO, "file follow service " + followfiles.getKey() + " with base " + base + " and pattern " + pattern + " added, event class " + className);
} catch (Exception e) {
logger.log(Level.SEVERE, "unable to create file follow service " + followfiles.getKey() + ", reason " + e.getMessage(), e);
}
}
}
}
public void close() {
for (Map.Entry<Future<?>, FileFollowEventService> entry : eventServiceMap.entrySet()) {
entry.getValue().setKeepWatching(false);
entry.getKey().cancel(true);
}
}
}

View file

@ -0,0 +1,163 @@
package org.xbib.event.io.file;
import org.xbib.event.bus.EventBus;
import org.xbib.settings.Settings;
import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SeekableByteChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileSystem;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class FileFollowEventService implements Callable<Integer>, Closeable {
private static final Logger logger = Logger.getLogger(FileFollowEventService.class.getName());
private final Settings settings;
private final EventBus eventBus;
private final Path base;
private final Pattern pattern;
private final Class<? extends FileFollowEvent> eventClass;
private final WatchService watchService;
private final WatchKey watchKey;
private final Map<Path, Long> fileSizes;
private int eventCount;
private volatile boolean keepWatching;
public FileFollowEventService(Settings settings,
EventBus eventBus,
Path base,
Pattern pattern,
Class<? extends FileFollowEvent> eventClass) throws IOException {
this.settings = settings;
this.eventBus = eventBus;
this.base = base;
this.pattern = pattern;
this.eventClass = eventClass;
FileSystem fileSystem = base.getFileSystem();
this.watchService = fileSystem.newWatchService();
WatchEvent.Kind<?>[] kinds = new WatchEvent.Kind<?>[1];
kinds[0] = StandardWatchEventKinds.ENTRY_MODIFY;
this.watchKey = base.register(watchService, kinds);
// limit file size memory to 32 files
this.fileSizes = new LimitedMap<>(32);
this.keepWatching = true;
}
public void setKeepWatching(boolean keepWatching) {
this.keepWatching = keepWatching;
}
@SuppressWarnings("unchecked")
@Override
public Integer call() {
while (keepWatching) {
WatchKey key = null;
try {
key = watchService.take();
for (WatchEvent<?> watchEvent : key.pollEvents()) {
WatchEvent<Path> pathWatchEvent = (WatchEvent<Path>) watchEvent;
Path path = pathWatchEvent.context();
Matcher matcher = pattern.matcher(path.toString());
if (!matcher.matches()) {
continue;
}
Path p = base.resolve(path);
try (SeekableByteChannel channel = Files.newByteChannel(p, StandardOpenOption.READ)) {
long lastSize = fileSizes.getOrDefault(p, 0L);
long currentSize = p.toFile().length();
fileSizes.put(p, currentSize);
// We have no idea where to start reading if this is the first time.
// Avoid reading the whole file at first time, read only real diff.
// This means first event is swallowed!
if (lastSize > 0L) {
String content = readRange(channel, lastSize, currentSize);
// split content by line, this allows pattern matching without preprocessing in worker
for (String line : content.split("\n")) {
FileFollowEvent event = eventClass.getDeclaredConstructor().newInstance();
event.setKey(path.toString());
event.setPath(base);
event.setMap(new LinkedHashMap<>());
event.getMap().putAll(settings.getAsStructuredMap());
event.getMap().put("content", line);
eventBus.post(event);
eventCount++;
}
}
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (Exception e) {
logger.log(Level.SEVERE, e.getMessage(), e);
} finally {
if (key != null) {
key.reset();
}
}
}
return eventCount;
}
@Override
public void close() throws IOException {
watchService.close();
}
private static String readRange(SeekableByteChannel fileChannel, long from, long to) throws IOException {
int delta = (int) (to - from);
if (delta <= 0) {
return "";
}
ByteBuffer byteBuffer = ByteBuffer.allocate(delta);
fileChannel.position(from);
int numRead = fileChannel.read(byteBuffer);
byteBuffer.flip();
CharBuffer chb = StandardCharsets.UTF_8.decode(byteBuffer);
byteBuffer.clear();
if (numRead <= 0) {
throw new IOException("numRead less or equal to 0");
}
return chb.toString();
}
private static class LimitedMap<K, V> extends LinkedHashMap<K, V> {
private final int n;
LimitedMap(int n) {
super();
this.n = n;
}
@Override
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
return size() > n;
}
}
}