diff --git a/src/main/java/module-info.java b/src/main/java/module-info.java index 44e6665..db430b8 100644 --- a/src/main/java/module-info.java +++ b/src/main/java/module-info.java @@ -1,15 +1,17 @@ module org.xbib.event { - exports org.xbib.event; exports org.xbib.event.bus; exports org.xbib.event.clock; + exports org.xbib.event.io; + exports org.xbib.event.io.file; + exports org.xbib.event.loop.selector; + exports org.xbib.event.loop; exports org.xbib.event.persistence; exports org.xbib.event.queue; exports org.xbib.event.syslog; - exports org.xbib.event.yield; - exports org.xbib.event.io; - exports org.xbib.event.loop; - exports org.xbib.event.loop.selector; + exports org.xbib.event.timer; exports org.xbib.event.util; + exports org.xbib.event.yield; + exports org.xbib.event; requires org.xbib.datastructures.api; requires org.xbib.datastructures.common; requires org.xbib.datastructures.json.tiny; diff --git a/src/main/java/org/xbib/event/DefaultEvent.java b/src/main/java/org/xbib/event/DefaultEvent.java new file mode 100644 index 0000000..2ad5858 --- /dev/null +++ b/src/main/java/org/xbib/event/DefaultEvent.java @@ -0,0 +1,33 @@ +package org.xbib.event; + +import java.util.Map; + +public class DefaultEvent implements Event { + + private String key; + + private Map map; + + public DefaultEvent() { + } + + @Override + public void setKey(String key) { + this.key = key; + } + + @Override + public String getKey() { + return key; + } + + @Override + public void setMap(Map map) { + this.map = map; + } + + @Override + public Map getMap() { + return map; + } +} diff --git a/src/main/java/org/xbib/event/EventManager.java b/src/main/java/org/xbib/event/EventManager.java index c2592c6..ce3b579 100644 --- a/src/main/java/org/xbib/event/EventManager.java +++ b/src/main/java/org/xbib/event/EventManager.java @@ -1,52 +1,92 @@ package org.xbib.event; import org.xbib.event.bus.AsyncEventBus; -import org.xbib.event.bus.EventBus; +import org.xbib.event.bus.SubscriberExceptionContext; +import org.xbib.event.bus.SubscriberExceptionHandler; import org.xbib.event.clock.ClockEventManager; +import org.xbib.event.timer.TimerEventManager; import org.xbib.settings.Settings; +import java.io.IOException; +import java.time.ZoneId; import java.util.ArrayList; import java.util.List; -import java.util.Map; +import java.util.Objects; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.logging.Level; import java.util.logging.Logger; -public class EventManager { +public final class EventManager { private static final Logger logger = Logger.getLogger(EventManager.class.getName()); - public EventManager(Settings settings) { - this(settings, new AsyncEventBus(Executors.newSingleThreadExecutor()), ClockEventManager.class.getClassLoader()); + private static ExecutorService executorService; + + private static ClassLoader classLoader; + + private static AsyncEventBus eventBus; + + private static List eventConsumers; + + static { + createInstances(); } - public EventManager(Settings settings, - EventBus eventBus, - ClassLoader classLoader) { - // register consumers - List consumerList = new ArrayList<>(); - for (Map.Entry consumers : settings.getGroups("event.consumer").entrySet()) { - Settings entrySettings = consumers.getValue(); - if (entrySettings.getAsBoolean("enabled", true)) { - String className = entrySettings.get("class"); - try { - if (className != null) { - @SuppressWarnings("unchecked") - Class consumerClass = (Class) classLoader.loadClass(className); - eventBus.register(consumerClass.getDeclaredConstructor().newInstance()); - logger.log(Level.INFO, "consumer " + consumerClass + " registered"); - consumerList.add(consumerClass.getName()); - } - } catch (Exception e) { - logger.log(Level.WARNING, "unable to load consumer " + className + ", reason " + e.getMessage()); - } - } + private final ClockEventManager clockEventManager; + + private final TimerEventManager timerEventManager; + + private EventManager(Settings settings) { + this.clockEventManager = new ClockEventManager(settings, eventBus, classLoader); + this.timerEventManager = new TimerEventManager(settings, eventBus, classLoader, ZoneId.systemDefault()); + } + + public static EventManager newEventManager(Settings settings) { + return new EventManager(settings); + } + + public static void register(EventConsumer eventConsumer) { + Objects.requireNonNull(eventConsumer, "event consumer must not be null"); + eventConsumers.add(eventConsumer); + eventBus.register(eventConsumer); + } + + public ClockEventManager getClockEventManager() { + return clockEventManager; + } + + public TimerEventManager getTimerEventManager() { + return timerEventManager; + } + + public List getEventConsumers() { + return eventConsumers; + } + + public void close() throws IOException { + clockEventManager.close(); + } + + private static class EventManagerExceptionHandler implements SubscriberExceptionHandler { + @Override + public void handleException(Throwable exception, SubscriberExceptionContext context) { + logger.log(Level.SEVERE, exception.getMessage(), exception); } - logger.log(Level.INFO, "consumers = " + consumerList); - } - public void start() { - + private static void createInstances() { + if (executorService == null) { + executorService = Executors.newFixedThreadPool(2); + } + if (classLoader == null) { + classLoader = EventManager.class.getClassLoader(); + } + if (eventBus == null) { + eventBus = new AsyncEventBus(executorService, new EventManagerExceptionHandler()); + } + if (eventConsumers == null) { + eventConsumers = new ArrayList<>(); + } } } diff --git a/src/main/java/org/xbib/event/clock/DefaultClockEvent.java b/src/main/java/org/xbib/event/clock/DefaultClockEvent.java index 92ba1ea..0bff810 100644 --- a/src/main/java/org/xbib/event/clock/DefaultClockEvent.java +++ b/src/main/java/org/xbib/event/clock/DefaultClockEvent.java @@ -1,39 +1,16 @@ package org.xbib.event.clock; +import org.xbib.event.DefaultEvent; + import java.time.Instant; -import java.util.Map; -public class DefaultClockEvent implements ClockEvent { - - private String key; - - private Map map; +public class DefaultClockEvent extends DefaultEvent implements ClockEvent { private Instant instant; public DefaultClockEvent() { } - @Override - public void setKey(String key) { - this.key = key; - } - - @Override - public String getKey() { - return key; - } - - @Override - public void setMap(Map map) { - this.map = map; - } - - @Override - public Map getMap() { - return map; - } - @Override public void setInstant(Instant instant) { this.instant = instant; diff --git a/src/main/java/org/xbib/event/io/file/DefaultFileFollowEvent.java b/src/main/java/org/xbib/event/io/file/DefaultFileFollowEvent.java new file mode 100644 index 0000000..8bd4cf6 --- /dev/null +++ b/src/main/java/org/xbib/event/io/file/DefaultFileFollowEvent.java @@ -0,0 +1,20 @@ +package org.xbib.event.io.file; + +import org.xbib.event.DefaultEvent; + +import java.nio.file.Path; + +public class DefaultFileFollowEvent extends DefaultEvent implements FileFollowEvent { + + private Path path; + + @Override + public void setPath(Path path) { + this.path = path; + } + + @Override + public Path getPath() { + return path; + } +} diff --git a/src/main/java/org/xbib/event/io/file/FileFollowEvent.java b/src/main/java/org/xbib/event/io/file/FileFollowEvent.java new file mode 100644 index 0000000..6b13990 --- /dev/null +++ b/src/main/java/org/xbib/event/io/file/FileFollowEvent.java @@ -0,0 +1,12 @@ +package org.xbib.event.io.file; + +import org.xbib.event.Event; + +import java.nio.file.Path; + +public interface FileFollowEvent extends Event { + + void setPath(Path path); + + Path getPath(); +} diff --git a/src/main/java/org/xbib/event/io/file/FileFollowEventManager.java b/src/main/java/org/xbib/event/io/file/FileFollowEventManager.java new file mode 100644 index 0000000..b786956 --- /dev/null +++ b/src/main/java/org/xbib/event/io/file/FileFollowEventManager.java @@ -0,0 +1,55 @@ +package org.xbib.event.io.file; + +import org.xbib.event.bus.AsyncEventBus; +import org.xbib.settings.Settings; + +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.regex.Pattern; + +public class FileFollowEventManager { + + private static final Logger logger = Logger.getLogger(FileFollowEventManager.class.getName()); + + private final Map, FileFollowEventService> eventServiceMap; + + @SuppressWarnings("unchecked") + public FileFollowEventManager(Settings settings, + AsyncEventBus eventBus, + ExecutorService executorService, + ClassLoader classLoader) { + this.eventServiceMap = new LinkedHashMap<>(); + for (Map.Entry followfiles : settings.getGroups("filefollow").entrySet()) { + Settings definition = followfiles.getValue(); + String baseStr = definition.get("base"); + String patternStr = definition.get("pattern"); + if (baseStr != null && patternStr != null) { + Path base = Paths.get(baseStr); + Pattern pattern = Pattern.compile(patternStr); + String className = definition.get("class", FileFollowEvent.class.getName()); + try { + Class eventClass = (Class) classLoader.loadClass(className); + FileFollowEventService fileFollowEventService = new FileFollowEventService(definition, eventBus, base, pattern, eventClass); + Future future = executorService.submit(fileFollowEventService); + eventServiceMap.put(future, fileFollowEventService); + logger.log(Level.INFO, "file follow service " + followfiles.getKey() + " with base " + base + " and pattern " + pattern + " added, event class " + className); + } catch (Exception e) { + logger.log(Level.SEVERE, "unable to create file follow service " + followfiles.getKey() + ", reason " + e.getMessage(), e); + } + } + } + } + + public void close() { + for (Map.Entry, FileFollowEventService> entry : eventServiceMap.entrySet()) { + entry.getValue().setKeepWatching(false); + entry.getKey().cancel(true); + } + } +} diff --git a/src/main/java/org/xbib/event/io/file/FileFollowEventService.java b/src/main/java/org/xbib/event/io/file/FileFollowEventService.java new file mode 100644 index 0000000..09460b0 --- /dev/null +++ b/src/main/java/org/xbib/event/io/file/FileFollowEventService.java @@ -0,0 +1,163 @@ +package org.xbib.event.io.file; + +import org.xbib.event.bus.EventBus; +import org.xbib.settings.Settings; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.CharBuffer; +import java.nio.channels.SeekableByteChannel; +import java.nio.charset.StandardCharsets; +import java.nio.file.FileSystem; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchEvent; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class FileFollowEventService implements Callable, Closeable { + + private static final Logger logger = Logger.getLogger(FileFollowEventService.class.getName()); + + private final Settings settings; + + private final EventBus eventBus; + + private final Path base; + + private final Pattern pattern; + + private final Class eventClass; + + private final WatchService watchService; + + private final WatchKey watchKey; + + private final Map fileSizes; + + private int eventCount; + + private volatile boolean keepWatching; + + public FileFollowEventService(Settings settings, + EventBus eventBus, + Path base, + Pattern pattern, + Class eventClass) throws IOException { + this.settings = settings; + this.eventBus = eventBus; + this.base = base; + this.pattern = pattern; + this.eventClass = eventClass; + FileSystem fileSystem = base.getFileSystem(); + this.watchService = fileSystem.newWatchService(); + WatchEvent.Kind[] kinds = new WatchEvent.Kind[1]; + kinds[0] = StandardWatchEventKinds.ENTRY_MODIFY; + this.watchKey = base.register(watchService, kinds); + // limit file size memory to 32 files + this.fileSizes = new LimitedMap<>(32); + this.keepWatching = true; + } + + public void setKeepWatching(boolean keepWatching) { + this.keepWatching = keepWatching; + } + + @SuppressWarnings("unchecked") + @Override + public Integer call() { + while (keepWatching) { + WatchKey key = null; + try { + key = watchService.take(); + for (WatchEvent watchEvent : key.pollEvents()) { + WatchEvent pathWatchEvent = (WatchEvent) watchEvent; + Path path = pathWatchEvent.context(); + Matcher matcher = pattern.matcher(path.toString()); + if (!matcher.matches()) { + continue; + } + Path p = base.resolve(path); + try (SeekableByteChannel channel = Files.newByteChannel(p, StandardOpenOption.READ)) { + long lastSize = fileSizes.getOrDefault(p, 0L); + long currentSize = p.toFile().length(); + fileSizes.put(p, currentSize); + // We have no idea where to start reading if this is the first time. + // Avoid reading the whole file at first time, read only real diff. + // This means first event is swallowed! + if (lastSize > 0L) { + String content = readRange(channel, lastSize, currentSize); + // split content by line, this allows pattern matching without preprocessing in worker + for (String line : content.split("\n")) { + FileFollowEvent event = eventClass.getDeclaredConstructor().newInstance(); + event.setKey(path.toString()); + event.setPath(base); + event.setMap(new LinkedHashMap<>()); + event.getMap().putAll(settings.getAsStructuredMap()); + event.getMap().put("content", line); + eventBus.post(event); + eventCount++; + } + } + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + logger.log(Level.SEVERE, e.getMessage(), e); + } finally { + if (key != null) { + key.reset(); + } + } + } + return eventCount; + } + + @Override + public void close() throws IOException { + watchService.close(); + } + + private static String readRange(SeekableByteChannel fileChannel, long from, long to) throws IOException { + int delta = (int) (to - from); + if (delta <= 0) { + return ""; + } + ByteBuffer byteBuffer = ByteBuffer.allocate(delta); + fileChannel.position(from); + int numRead = fileChannel.read(byteBuffer); + byteBuffer.flip(); + CharBuffer chb = StandardCharsets.UTF_8.decode(byteBuffer); + byteBuffer.clear(); + if (numRead <= 0) { + throw new IOException("numRead less or equal to 0"); + } + return chb.toString(); + } + + private static class LimitedMap extends LinkedHashMap { + + private final int n; + + LimitedMap(int n) { + super(); + this.n = n; + } + + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() > n; + } + } +}