add suspend/resume to path event manager

This commit is contained in:
Jörg Prante 2024-01-05 16:51:43 +01:00
parent 9309c43040
commit d191369efe
4 changed files with 122 additions and 44 deletions

View file

@ -1,3 +1,3 @@
group = org.xbib group = org.xbib
name = event name = event
version = 0.0.10 version = 0.0.11

View file

@ -18,4 +18,8 @@ public interface PathEvent extends Event {
String getSuffix(); String getSuffix();
void success();
void fail();
} }

View file

@ -2,7 +2,7 @@ package org.xbib.event.io.path;
import org.xbib.datastructures.api.TimeValue; import org.xbib.datastructures.api.TimeValue;
import org.xbib.datastructures.json.tiny.Json; import org.xbib.datastructures.json.tiny.Json;
import org.xbib.event.bus.AsyncEventBus; import org.xbib.event.bus.EventBus;
import org.xbib.settings.Settings; import org.xbib.settings.Settings;
import java.io.Closeable; import java.io.Closeable;
@ -12,6 +12,7 @@ import java.io.Writer;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -25,16 +26,25 @@ public class PathEventManager implements Closeable {
private static final Logger logger = Logger.getLogger(PathEventManager.class.getName()); private static final Logger logger = Logger.getLogger(PathEventManager.class.getName());
private final EventBus eventBus;
private final ExecutorService executorService;
private final Path path; private final Path path;
private final Map<Future<?>, PathEventService> eventServiceMap; private final Map<Future<?>, PathEventService> eventServiceMap;
private final List<String> suspendedQueues;
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public PathEventManager(Settings settings, public PathEventManager(Settings settings,
AsyncEventBus eventBus, EventBus eventBus,
ExecutorService executorService, ExecutorService executorService,
ClassLoader classLoader) { ClassLoader classLoader) {
this.eventBus = eventBus;
this.executorService = executorService;
this.eventServiceMap = new LinkedHashMap<>(); this.eventServiceMap = new LinkedHashMap<>();
this.suspendedQueues = new ArrayList<>();
this.path = Paths.get(settings.get("event.pathevent.base", "/var/tmp/pathevent")); this.path = Paths.get(settings.get("event.pathevent.base", "/var/tmp/pathevent"));
for (Map.Entry<String, Settings> entry : settings.getGroups("event.path").entrySet()) { for (Map.Entry<String, Settings> entry : settings.getGroups("event.path").entrySet()) {
try { try {
@ -46,11 +56,9 @@ public class PathEventManager implements Closeable {
String className = definition.get("class", DefaultPathEvent.class.getName()); String className = definition.get("class", DefaultPathEvent.class.getName());
Class<? extends PathEvent> eventClass = (Class<? extends PathEvent>) classLoader.loadClass(className); Class<? extends PathEvent> eventClass = (Class<? extends PathEvent>) classLoader.loadClass(className);
Path p = path.resolve(name); Path p = path.resolve(name);
createQueue(name, p); createPathEventService(name, p, maxBytes, lifetime, eventClass);
PathEventService pathEventService = new PathEventService(eventBus, name, p, maxBytes, lifetime, eventClass); } else {
Future<?> future = executorService.submit(pathEventService); logger.log(Level.WARNING, "path servive definition not enabled in configuration");
eventServiceMap.put(future, pathEventService);
logger.log(Level.INFO, "path event service " + entry.getKey() + " with path " + p + " and max " + maxBytes + " added, event class " + className);
} }
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.SEVERE, "unable to create event path service " + entry.getKey() + ", reason " + e.getMessage(), e); logger.log(Level.SEVERE, "unable to create event path service " + entry.getKey() + ", reason " + e.getMessage(), e);
@ -58,18 +66,21 @@ public class PathEventManager implements Closeable {
} }
} }
private static void createQueue(String name, Path p) throws IOException { public void createPathEventService(String name,
if (!Files.exists(p)) { Path path,
logger.log(Level.FINE, "creating queue " + name + " at " + p); int maxBytes,
Files.createDirectories(p); TimeValue lifetime,
for (String s: List.of(PathEventService.INCOMING, PathEventService.SUCCESS, PathEventService.FAIL)) { Class<? extends PathEvent> eventClass)
Path dir = p.resolve(s); throws IOException {
if (!Files.exists(dir)) { createQueue(name, path);
logger.log(Level.FINE, "creating queue " + name + " dir " + dir); PathEventService pathEventService = new PathEventService(this, eventBus, name, path, maxBytes, lifetime, eventClass);
Files.createDirectories(dir); add(pathEventService);
} }
}
} public void add(PathEventService pathEventService) {
Future<?> future = executorService.submit(pathEventService);
eventServiceMap.put(future, pathEventService);
logger.log(Level.INFO, "path event service " + pathEventService + " added");
} }
@Override @Override
@ -85,6 +96,18 @@ public class PathEventManager implements Closeable {
}); });
} }
public List<String> getSuspendedQueues() {
return suspendedQueues;
}
public void suspend(String queue) {
suspendedQueues.add(queue);
}
public void resume(String queue) {
suspendedQueues.remove(queue);
}
public boolean put(String queue, String key, Map<String,Object> map) throws IOException { public boolean put(String queue, String key, Map<String,Object> map) throws IOException {
return put(queue, key, ".json", Json.toString(map)); return put(queue, key, ".json", Json.toString(map));
} }
@ -109,6 +132,7 @@ public class PathEventManager implements Closeable {
try (Writer writer = Files.newBufferedWriter(p)) { try (Writer writer = Files.newBufferedWriter(p)) {
writer.write(string); writer.write(string);
} }
// obligatory purge. This is hacky.
eventServiceMap.forEach((k, v) -> { eventServiceMap.forEach((k, v) -> {
if (v.getName().equals(queue)) { if (v.getName().equals(queue)) {
try { try {
@ -145,4 +169,18 @@ public class PathEventManager implements Closeable {
return stream.count(); return stream.count();
} }
} }
private static void createQueue(String name, Path p) throws IOException {
if (!Files.exists(p)) {
logger.log(Level.FINE, "creating queue " + name + " at " + p);
Files.createDirectories(p);
for (String s: List.of(PathEventService.INCOMING, PathEventService.SUCCESS, PathEventService.FAIL)) {
Path dir = p.resolve(s);
if (!Files.exists(dir)) {
logger.log(Level.FINE, "creating queue " + name + " dir " + dir);
Files.createDirectories(dir);
}
}
}
}
} }

View file

@ -7,7 +7,6 @@ import org.xbib.event.bus.EventBus;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.io.UncheckedIOException; import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.ClosedWatchServiceException; import java.nio.file.ClosedWatchServiceException;
import java.nio.file.DirectoryStream; import java.nio.file.DirectoryStream;
import java.nio.file.Files; import java.nio.file.Files;
@ -33,6 +32,8 @@ public class PathEventService implements Callable<Integer>, Closeable {
public static final String FAIL = "fail"; public static final String FAIL = "fail";
private final PathEventManager pathEventManager;
private final EventBus eventBus; private final EventBus eventBus;
private final Path path; private final Path path;
@ -51,12 +52,14 @@ public class PathEventService implements Callable<Integer>, Closeable {
private volatile boolean keepWatching; private volatile boolean keepWatching;
public PathEventService(EventBus eventBus, public PathEventService(PathEventManager pathEventManager,
EventBus eventBus,
String name, String name,
Path path, Path path,
int maxFileSize, int maxFileSize,
TimeValue lifetime, TimeValue lifetime,
Class<? extends PathEvent> pathEventClass) throws IOException { Class<? extends PathEvent> pathEventClass) throws IOException {
this.pathEventManager = pathEventManager;
this.eventBus = eventBus; this.eventBus = eventBus;
this.name = name; this.name = name;
this.path = path; this.path = path;
@ -95,7 +98,11 @@ public class PathEventService implements Callable<Integer>, Closeable {
String watchEventContext = pathWatchEvent.context().toString(); String watchEventContext = pathWatchEvent.context().toString();
Path p = path.resolve(INCOMING).resolve(watchEventContext); Path p = path.resolve(INCOMING).resolve(watchEventContext);
logger.log(Level.FINE, "watch event " + pathWatchEvent + " context = " + watchEventContext + " path = " + p); logger.log(Level.FINE, "watch event " + pathWatchEvent + " context = " + watchEventContext + " path = " + p);
postEvent(watchEventContext, p); if (pathEventManager.getSuspendedQueues().contains(name)) {
failEvent(watchEventContext, p);
} else {
postEvent(watchEventContext, p);
}
} }
watchKey.reset(); watchKey.reset();
} }
@ -157,27 +164,19 @@ public class PathEventService implements Callable<Integer>, Closeable {
} }
private void postEvent(String key, Path file) { private void postEvent(String key, Path file) {
String base = getBase(key); PathEvent event = toEvent(key, file);
String suffix = getSuffix(key); if (event != null) {
try { logger.log(Level.FINE, "posting new event = " + event.getClass());
long fileSize = Files.size(file); eventBus.post(event);
if (fileSize < maxFileSize && "json".equals(suffix)) { eventCount++;
byte[] b = Files.readAllBytes(file); }
String json = new String(b, StandardCharsets.UTF_8); }
PathEvent event = pathEventClass.getConstructor().newInstance();
event.setKey(base); private void failEvent(String key, Path file) {
event.setFile(file); PathEvent event = toEvent(key, file);
event.setSuffix(suffix); if (event != null) {
event.setPath(path); // remember directory for fail() and success() logger.log(Level.WARNING, "queue " + name + " suspended, event short-circuited to fail");
event.setMap(Json.toMap(json)); event.fail();
logger.log(Level.FINE, "posting new event = " + event.getClass());
eventBus.post(event);
eventCount++;
} else {
logger.log(Level.SEVERE, "skipping post event because incoming file is too large, max file size = " + maxFileSize);
}
} catch (Exception e) {
logger.log(Level.SEVERE, e.getMessage(), e);
} }
} }
@ -190,4 +189,41 @@ public class PathEventService implements Callable<Integer>, Closeable {
int pos = name.lastIndexOf('.'); int pos = name.lastIndexOf('.');
return pos >= 0 ? name.substring(pos + 1) : null; return pos >= 0 ? name.substring(pos + 1) : null;
} }
private PathEvent toEvent(String key, Path file) {
try {
String base = getBase(key);
String suffix = getSuffix(key);
long fileSize = Files.size(file);
if (fileSize > maxFileSize) {
logger.log(Level.SEVERE, "event object ignored, too large");
return null;
}
if ("!json".equals(suffix)) {
logger.log(Level.SEVERE, "event object ignored, no json suffix");
return null;
}
String json = Files.readString(file);
return toEvent(base, file, suffix, json);
} catch (Exception e) {
logger.log(Level.SEVERE, e.getMessage(), e);
}
logger.log(Level.SEVERE, "event object could not be created");
return null;
}
private PathEvent toEvent(String base, Path file, String suffix, String json) {
try {
PathEvent event = pathEventClass.getConstructor().newInstance();
event.setKey(base);
event.setFile(file);
event.setSuffix(suffix);
event.setPath(path); // remember directory for fail() and success()
event.setMap(Json.toMap(json));
return event;
} catch (Exception e) {
logger.log(Level.SEVERE, e.getMessage(), e);
return null;
}
}
} }