diff --git a/src/main/java/org/xbib/event/EventManager.java b/src/main/java/org/xbib/event/EventManager.java index 5645f76..09695e6 100644 --- a/src/main/java/org/xbib/event/EventManager.java +++ b/src/main/java/org/xbib/event/EventManager.java @@ -5,9 +5,11 @@ import org.xbib.event.bus.SubscriberExceptionContext; import org.xbib.event.bus.SubscriberExceptionHandler; import org.xbib.event.clock.ClockEventManager; import org.xbib.event.io.file.FileFollowEventManager; +import org.xbib.event.io.path.PathEventManager; import org.xbib.event.timer.TimerEventManager; import org.xbib.settings.Settings; +import java.io.Closeable; import java.io.IOException; import java.time.ZoneId; import java.util.ArrayList; @@ -18,7 +20,7 @@ import java.util.concurrent.Executors; import java.util.logging.Level; import java.util.logging.Logger; -public final class EventManager { +public final class EventManager implements Closeable { private static final Logger logger = Logger.getLogger(EventManager.class.getName()); @@ -40,10 +42,13 @@ public final class EventManager { private final FileFollowEventManager fileFollowEventManager; + private final PathEventManager pathEventManager; + private EventManager(Settings settings) { this.clockEventManager = new ClockEventManager(settings, eventBus, classLoader); this.timerEventManager = new TimerEventManager(settings, eventBus, classLoader, ZoneId.systemDefault()); this.fileFollowEventManager = new FileFollowEventManager(settings, eventBus, executorService, classLoader); + this.pathEventManager = new PathEventManager(settings, eventBus, executorService, classLoader); } public static EventManager newEventManager(Settings settings) { @@ -72,8 +77,19 @@ public final class EventManager { return fileFollowEventManager; } + public PathEventManager getPathEventManager() { + return pathEventManager; + } + + @Override + public boolean equals(Object obj) { + return super.equals(obj); + } + + @Override public void close() throws IOException { clockEventManager.close(); + pathEventManager.close(); } private static class EventManagerExceptionHandler implements SubscriberExceptionHandler { diff --git a/src/main/java/org/xbib/event/io/file/FileFollowEventManager.java b/src/main/java/org/xbib/event/io/file/FileFollowEventManager.java index 273898f..221bf65 100644 --- a/src/main/java/org/xbib/event/io/file/FileFollowEventManager.java +++ b/src/main/java/org/xbib/event/io/file/FileFollowEventManager.java @@ -32,7 +32,7 @@ public class FileFollowEventManager { if (baseStr != null && patternStr != null) { Path base = Paths.get(baseStr); Pattern pattern = Pattern.compile(patternStr); - String className = definition.get("class", FileFollowEvent.class.getName()); + String className = definition.get("class", DefaultFileFollowEvent.class.getName()); try { Class eventClass = (Class) classLoader.loadClass(className); FileFollowEventService fileFollowEventService = new FileFollowEventService(definition, eventBus, base, pattern, eventClass); diff --git a/src/main/java/org/xbib/event/io/path/PathEventManager.java b/src/main/java/org/xbib/event/io/path/PathEventManager.java new file mode 100644 index 0000000..76e829e --- /dev/null +++ b/src/main/java/org/xbib/event/io/path/PathEventManager.java @@ -0,0 +1,146 @@ +package org.xbib.event.io.path; + +import org.xbib.datastructures.api.TimeValue; +import org.xbib.datastructures.json.tiny.Json; +import org.xbib.event.bus.AsyncEventBus; +import org.xbib.settings.Settings; + +import java.io.Closeable; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.io.Writer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.stream.Stream; + +public class PathEventManager implements Closeable { + + private static final Logger logger = Logger.getLogger(PathEventManager.class.getName()); + + private final Path path; + + private final Map, PathEventService> eventServiceMap; + + @SuppressWarnings("unchecked") + public PathEventManager(Settings settings, + AsyncEventBus eventBus, + ExecutorService executorService, + ClassLoader classLoader) { + this.eventServiceMap = new LinkedHashMap<>(); + this.path = Paths.get(settings.get("event.pathevent.base", "/var/tmp/pathevent")); + for (Map.Entry entry : settings.getGroups("event.path").entrySet()) { + try { + String name = entry.getKey(); + Settings definition = entry.getValue(); + int maxBytes = definition.getAsInt("maxfilesize", 10 * 1024 * 1024); // 10 MB + TimeValue lifetime = definition.getAsTime("lifetime", TimeValue.timeValueHours(72)); + String className = definition.get("class", DefaultPathEvent.class.getName()); + Class eventClass = (Class) classLoader.loadClass(className); + Path p = path.resolve(name); + createQueue(name, p); + PathEventService pathEventService = new PathEventService(eventBus, name, p, maxBytes, lifetime, eventClass); + Future future = executorService.submit(pathEventService); + eventServiceMap.put(future, pathEventService); + logger.log(Level.INFO, "path event service " + entry.getKey() + " with path " + p + " and max " + maxBytes + " added, event class " + className); + } catch (Exception e) { + logger.log(Level.SEVERE, "unable to create event path service " + entry.getKey() + ", reason " + e.getMessage(), e); + } + } + } + + private static void createQueue(String name, Path p) throws IOException { + if (!Files.exists(p)) { + logger.log(Level.FINE, "creating queue " + name + " at " + p); + Files.createDirectories(p); + for (String s: List.of(PathEventService.INCOMING, PathEventService.SUCCESS, PathEventService.FAIL)) { + Path dir = p.resolve(s); + if (!Files.exists(dir)) { + logger.log(Level.FINE, "creating queue " + name + " dir " + dir); + Files.createDirectories(dir); + } + } + } + } + + @Override + public void close() throws IOException { + logger.log(Level.INFO, "closing all path event services"); + eventServiceMap.forEach((k, v) -> { + k.cancel(true); + try { + v.close(); + } catch (IOException e) { + logger.log(Level.SEVERE, e.getMessage(), e); + } + }); + } + + public boolean put(String queue, String key, Map map) throws IOException { + return put(queue, key, ".json", Json.toString(map)); + } + + public boolean putIfNotExists(String queue, String key, Map map) throws IOException { + if (!exists(queue, key, ".json")) { + return put(queue, key, ".json", Json.toString(map)); + } else { + return false; + } + } + + public boolean put(String queue, String key, String suffix, String string) throws IOException { + String keyFileName = key + suffix; + Path queuePath = path.resolve(queue); + if (Files.exists(queuePath.resolve(PathEventService.INCOMING).resolve(keyFileName)) || + Files.exists(queuePath.resolve(PathEventService.SUCCESS).resolve(keyFileName))) { + logger.log(Level.WARNING, "key " + key + " already exists"); + return false; + } + Path p = queuePath.resolve(PathEventService.INCOMING).resolve(keyFileName); + try (Writer writer = Files.newBufferedWriter(p)) { + writer.write(string); + } + eventServiceMap.forEach((k, v) -> { + if (v.getName().equals(queue)) { + try { + v.purge(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + }); + return true; + } + + public boolean exists(String queue, String key, String suffix) { + String keyFileName = key + suffix; + Path queuePath = path.resolve(queue); + return Files.exists(queuePath.resolve(PathEventService.INCOMING).resolve(keyFileName)) || + Files.exists(queuePath.resolve(PathEventService.SUCCESS).resolve(keyFileName)); + } + + public long sizeOfIncoming(String queue) throws IOException { + return sizeOf(path.resolve(queue).resolve(PathEventService.INCOMING)); + } + + public long sizeOfSuccess(String queue) throws IOException { + return sizeOf(path.resolve(queue).resolve(PathEventService.SUCCESS)); + } + + public long sizeOfFail(String queue) throws IOException { + return sizeOf(path.resolve(queue).resolve(PathEventService.FAIL)); + } + + public static long sizeOf(Path path) throws IOException { + try (Stream stream = Files.find(path, 1, (p, basicFileAttributes) -> Files.isRegularFile(p))) { + return stream.count(); + } + } +} diff --git a/src/main/java/org/xbib/event/io/path/PathEventService.java b/src/main/java/org/xbib/event/io/path/PathEventService.java index 1925cc0..d1e5bf5 100644 --- a/src/main/java/org/xbib/event/io/path/PathEventService.java +++ b/src/main/java/org/xbib/event/io/path/PathEventService.java @@ -1,10 +1,12 @@ package org.xbib.event.io.path; +import org.xbib.datastructures.api.TimeValue; import org.xbib.datastructures.json.tiny.Json; import org.xbib.event.bus.EventBus; import java.io.Closeable; import java.io.IOException; +import java.io.UncheckedIOException; import java.nio.charset.StandardCharsets; import java.nio.file.ClosedWatchServiceException; import java.nio.file.DirectoryStream; @@ -14,7 +16,9 @@ import java.nio.file.StandardWatchEventKinds; import java.nio.file.WatchEvent; import java.nio.file.WatchKey; import java.nio.file.WatchService; -import java.util.LinkedHashMap; +import java.nio.file.attribute.BasicFileAttributeView; +import java.nio.file.attribute.BasicFileAttributes; +import java.time.Instant; import java.util.concurrent.Callable; import java.util.logging.Level; import java.util.logging.Logger; @@ -33,7 +37,11 @@ public class PathEventService implements Callable, Closeable { private final Path path; + private final String name; + private final int maxFileSize; + + private final TimeValue lifetime; ; private final Class pathEventClass; @@ -44,12 +52,16 @@ public class PathEventService implements Callable, Closeable { private volatile boolean keepWatching; public PathEventService(EventBus eventBus, + String name, Path path, int maxFileSize, + TimeValue lifetime, Class pathEventClass) throws IOException { this.eventBus = eventBus; + this.name = name; this.path = path; this.maxFileSize = maxFileSize; + this.lifetime = lifetime; this.pathEventClass = pathEventClass; drainIncoming(); this.watchService = path.getFileSystem().newWatchService(); @@ -59,6 +71,11 @@ public class PathEventService implements Callable, Closeable { logger.log(Level.INFO, "path event service created for incoming files at " + path + " max file size = " + maxFileSize); } + public String getName() { + return name; + } + + @SuppressWarnings("unchecked") @Override public Integer call() { @@ -102,7 +119,7 @@ public class PathEventService implements Callable, Closeable { } } - private void drainIncoming() throws IOException { + public void drainIncoming() throws IOException { try (DirectoryStream stream = Files.newDirectoryStream(path.resolve(INCOMING))) { stream.forEach(path -> { if (Files.isRegularFile(path)) { @@ -114,6 +131,31 @@ public class PathEventService implements Callable, Closeable { } } + public void purge() throws IOException { + purge(path); + } + + public void purge(Path path) throws IOException { + try (DirectoryStream stream = Files.newDirectoryStream(path)) { + stream.forEach(p -> { + try { + if (Files.isRegularFile(p)) { + BasicFileAttributeView view = Files.getFileAttributeView(p, BasicFileAttributeView.class); + BasicFileAttributes attrs = view.readAttributes(); + if (Instant.now().minusMillis(attrs.lastModifiedTime().toMillis()).toEpochMilli() > lifetime.millis()) { + logger.log(Level.WARNING, "lifetime of " + lifetime + " exceeded, deleting " + p); + Files.delete(p); + } + } else { + purge(p); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } + } + private void postEvent(String key, Path file) { String base = getBase(key); String suffix = getSuffix(key);