From d191369efe3a7c5c7d95f8caf3e68722a720edf0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Prante?= Date: Fri, 5 Jan 2024 16:51:43 +0100 Subject: [PATCH] add suspend/resume to path event manager --- gradle.properties | 2 +- .../org/xbib/event/io/path/PathEvent.java | 4 + .../xbib/event/io/path/PathEventManager.java | 76 ++++++++++++----- .../xbib/event/io/path/PathEventService.java | 84 +++++++++++++------ 4 files changed, 122 insertions(+), 44 deletions(-) diff --git a/gradle.properties b/gradle.properties index e251cbe..7f983ff 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ group = org.xbib name = event -version = 0.0.10 +version = 0.0.11 diff --git a/src/main/java/org/xbib/event/io/path/PathEvent.java b/src/main/java/org/xbib/event/io/path/PathEvent.java index fb73693..dfb6f28 100644 --- a/src/main/java/org/xbib/event/io/path/PathEvent.java +++ b/src/main/java/org/xbib/event/io/path/PathEvent.java @@ -18,4 +18,8 @@ public interface PathEvent extends Event { String getSuffix(); + void success(); + + void fail(); + } diff --git a/src/main/java/org/xbib/event/io/path/PathEventManager.java b/src/main/java/org/xbib/event/io/path/PathEventManager.java index 6c754c0..6bef364 100644 --- a/src/main/java/org/xbib/event/io/path/PathEventManager.java +++ b/src/main/java/org/xbib/event/io/path/PathEventManager.java @@ -2,7 +2,7 @@ package org.xbib.event.io.path; import org.xbib.datastructures.api.TimeValue; import org.xbib.datastructures.json.tiny.Json; -import org.xbib.event.bus.AsyncEventBus; +import org.xbib.event.bus.EventBus; import org.xbib.settings.Settings; import java.io.Closeable; @@ -12,6 +12,7 @@ import java.io.Writer; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -25,16 +26,25 @@ public class PathEventManager implements Closeable { private static final Logger logger = Logger.getLogger(PathEventManager.class.getName()); + private final EventBus eventBus; + + private final ExecutorService executorService; + private final Path path; private final Map, PathEventService> eventServiceMap; + private final List suspendedQueues; + @SuppressWarnings("unchecked") public PathEventManager(Settings settings, - AsyncEventBus eventBus, + EventBus eventBus, ExecutorService executorService, ClassLoader classLoader) { + this.eventBus = eventBus; + this.executorService = executorService; this.eventServiceMap = new LinkedHashMap<>(); + this.suspendedQueues = new ArrayList<>(); this.path = Paths.get(settings.get("event.pathevent.base", "/var/tmp/pathevent")); for (Map.Entry entry : settings.getGroups("event.path").entrySet()) { try { @@ -46,11 +56,9 @@ public class PathEventManager implements Closeable { String className = definition.get("class", DefaultPathEvent.class.getName()); Class eventClass = (Class) classLoader.loadClass(className); Path p = path.resolve(name); - createQueue(name, p); - PathEventService pathEventService = new PathEventService(eventBus, name, p, maxBytes, lifetime, eventClass); - Future future = executorService.submit(pathEventService); - eventServiceMap.put(future, pathEventService); - logger.log(Level.INFO, "path event service " + entry.getKey() + " with path " + p + " and max " + maxBytes + " added, event class " + className); + createPathEventService(name, p, maxBytes, lifetime, eventClass); + } else { + logger.log(Level.WARNING, "path servive definition not enabled in configuration"); } } catch (Exception e) { logger.log(Level.SEVERE, "unable to create event path service " + entry.getKey() + ", reason " + e.getMessage(), e); @@ -58,18 +66,21 @@ public class PathEventManager implements Closeable { } } - private static void createQueue(String name, Path p) throws IOException { - if (!Files.exists(p)) { - logger.log(Level.FINE, "creating queue " + name + " at " + p); - Files.createDirectories(p); - for (String s: List.of(PathEventService.INCOMING, PathEventService.SUCCESS, PathEventService.FAIL)) { - Path dir = p.resolve(s); - if (!Files.exists(dir)) { - logger.log(Level.FINE, "creating queue " + name + " dir " + dir); - Files.createDirectories(dir); - } - } - } + public void createPathEventService(String name, + Path path, + int maxBytes, + TimeValue lifetime, + Class eventClass) + throws IOException { + createQueue(name, path); + PathEventService pathEventService = new PathEventService(this, eventBus, name, path, maxBytes, lifetime, eventClass); + add(pathEventService); + } + + public void add(PathEventService pathEventService) { + Future future = executorService.submit(pathEventService); + eventServiceMap.put(future, pathEventService); + logger.log(Level.INFO, "path event service " + pathEventService + " added"); } @Override @@ -85,6 +96,18 @@ public class PathEventManager implements Closeable { }); } + public List getSuspendedQueues() { + return suspendedQueues; + } + + public void suspend(String queue) { + suspendedQueues.add(queue); + } + + public void resume(String queue) { + suspendedQueues.remove(queue); + } + public boolean put(String queue, String key, Map map) throws IOException { return put(queue, key, ".json", Json.toString(map)); } @@ -109,6 +132,7 @@ public class PathEventManager implements Closeable { try (Writer writer = Files.newBufferedWriter(p)) { writer.write(string); } + // obligatory purge. This is hacky. eventServiceMap.forEach((k, v) -> { if (v.getName().equals(queue)) { try { @@ -145,4 +169,18 @@ public class PathEventManager implements Closeable { return stream.count(); } } + + private static void createQueue(String name, Path p) throws IOException { + if (!Files.exists(p)) { + logger.log(Level.FINE, "creating queue " + name + " at " + p); + Files.createDirectories(p); + for (String s: List.of(PathEventService.INCOMING, PathEventService.SUCCESS, PathEventService.FAIL)) { + Path dir = p.resolve(s); + if (!Files.exists(dir)) { + logger.log(Level.FINE, "creating queue " + name + " dir " + dir); + Files.createDirectories(dir); + } + } + } + } } diff --git a/src/main/java/org/xbib/event/io/path/PathEventService.java b/src/main/java/org/xbib/event/io/path/PathEventService.java index d1e5bf5..ad0cd70 100644 --- a/src/main/java/org/xbib/event/io/path/PathEventService.java +++ b/src/main/java/org/xbib/event/io/path/PathEventService.java @@ -7,7 +7,6 @@ import org.xbib.event.bus.EventBus; import java.io.Closeable; import java.io.IOException; import java.io.UncheckedIOException; -import java.nio.charset.StandardCharsets; import java.nio.file.ClosedWatchServiceException; import java.nio.file.DirectoryStream; import java.nio.file.Files; @@ -33,6 +32,8 @@ public class PathEventService implements Callable, Closeable { public static final String FAIL = "fail"; + private final PathEventManager pathEventManager; + private final EventBus eventBus; private final Path path; @@ -51,12 +52,14 @@ public class PathEventService implements Callable, Closeable { private volatile boolean keepWatching; - public PathEventService(EventBus eventBus, + public PathEventService(PathEventManager pathEventManager, + EventBus eventBus, String name, Path path, int maxFileSize, TimeValue lifetime, Class pathEventClass) throws IOException { + this.pathEventManager = pathEventManager; this.eventBus = eventBus; this.name = name; this.path = path; @@ -95,7 +98,11 @@ public class PathEventService implements Callable, Closeable { String watchEventContext = pathWatchEvent.context().toString(); Path p = path.resolve(INCOMING).resolve(watchEventContext); logger.log(Level.FINE, "watch event " + pathWatchEvent + " context = " + watchEventContext + " path = " + p); - postEvent(watchEventContext, p); + if (pathEventManager.getSuspendedQueues().contains(name)) { + failEvent(watchEventContext, p); + } else { + postEvent(watchEventContext, p); + } } watchKey.reset(); } @@ -157,27 +164,19 @@ public class PathEventService implements Callable, Closeable { } private void postEvent(String key, Path file) { - String base = getBase(key); - String suffix = getSuffix(key); - try { - long fileSize = Files.size(file); - if (fileSize < maxFileSize && "json".equals(suffix)) { - byte[] b = Files.readAllBytes(file); - String json = new String(b, StandardCharsets.UTF_8); - PathEvent event = pathEventClass.getConstructor().newInstance(); - event.setKey(base); - event.setFile(file); - event.setSuffix(suffix); - event.setPath(path); // remember directory for fail() and success() - event.setMap(Json.toMap(json)); - logger.log(Level.FINE, "posting new event = " + event.getClass()); - eventBus.post(event); - eventCount++; - } else { - logger.log(Level.SEVERE, "skipping post event because incoming file is too large, max file size = " + maxFileSize); - } - } catch (Exception e) { - logger.log(Level.SEVERE, e.getMessage(), e); + PathEvent event = toEvent(key, file); + if (event != null) { + logger.log(Level.FINE, "posting new event = " + event.getClass()); + eventBus.post(event); + eventCount++; + } + } + + private void failEvent(String key, Path file) { + PathEvent event = toEvent(key, file); + if (event != null) { + logger.log(Level.WARNING, "queue " + name + " suspended, event short-circuited to fail"); + event.fail(); } } @@ -190,4 +189,41 @@ public class PathEventService implements Callable, Closeable { int pos = name.lastIndexOf('.'); return pos >= 0 ? name.substring(pos + 1) : null; } + + private PathEvent toEvent(String key, Path file) { + try { + String base = getBase(key); + String suffix = getSuffix(key); + long fileSize = Files.size(file); + if (fileSize > maxFileSize) { + logger.log(Level.SEVERE, "event object ignored, too large"); + return null; + } + if ("!json".equals(suffix)) { + logger.log(Level.SEVERE, "event object ignored, no json suffix"); + return null; + } + String json = Files.readString(file); + return toEvent(base, file, suffix, json); + } catch (Exception e) { + logger.log(Level.SEVERE, e.getMessage(), e); + } + logger.log(Level.SEVERE, "event object could not be created"); + return null; + } + + private PathEvent toEvent(String base, Path file, String suffix, String json) { + try { + PathEvent event = pathEventClass.getConstructor().newInstance(); + event.setKey(base); + event.setFile(file); + event.setSuffix(suffix); + event.setPath(path); // remember directory for fail() and success() + event.setMap(Json.toMap(json)); + return event; + } catch (Exception e) { + logger.log(Level.SEVERE, e.getMessage(), e); + return null; + } + } }