working on path events

This commit is contained in:
Jörg Prante 2024-02-01 16:51:08 +01:00
parent 65d6f33edb
commit 788ce1b7d7
6 changed files with 165 additions and 54 deletions

View file

@ -3,6 +3,7 @@ package org.xbib.event;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
@SuppressWarnings("serial")
public class Payload extends LinkedHashMap<String, Object> { public class Payload extends LinkedHashMap<String, Object> {
public Payload() { public Payload() {

View file

@ -244,6 +244,26 @@ public final class EventManager {
} }
} }
public static Event eventOf(String eventType,
String code,
String message,
Path path) {
return eventBuilder()
.setType(eventType)
.setCode(code)
.setMessage(message)
.setPath(path)
.build();
}
public static Event eventOf(String eventType,
Instant scheduled) {
return eventBuilder()
.setType(eventType)
.setScheduledFor(scheduled)
.build();
}
public static Event eventFromFile(Path file) throws IOException { public static Event eventFromFile(Path file) throws IOException {
return eventFromJson(Files.readString(file)); return eventFromJson(Files.readString(file));
} }
@ -333,20 +353,35 @@ public final class EventManager {
} }
public void success() throws IOException { public void success() throws IOException {
if (builder.path != null) { Path path = builder.path;
Files.setLastModifiedTime(builder.path, FileTime.from(Instant.now())); if (path != null) {
Files.move(builder.path, builder.path.getParent().resolve(Event.SUCCESS) Path root = findRoot(path);
.resolve(builder.path.getFileName()).toAbsolutePath(), Files.setLastModifiedTime(path, FileTime.from(Instant.now()));
Files.move(path, root.resolve(Event.SUCCESS).resolve(builder.path.getFileName()).toAbsolutePath(),
StandardCopyOption.REPLACE_EXISTING); StandardCopyOption.REPLACE_EXISTING);
} }
} }
public void fail() throws IOException { public void fail() throws IOException {
Files.setLastModifiedTime(builder.path, FileTime.from(Instant.now())); Path path = builder.path;
Files.move(builder.path, builder.path.getParent().resolve(Event.FAIL) if (path != null) {
.resolve(builder.path.getFileName()).toAbsolutePath(), Path root = findRoot(path);
StandardCopyOption.REPLACE_EXISTING); Files.setLastModifiedTime(path, FileTime.from(Instant.now()));
Files.move(path, root.resolve(Event.FAIL).resolve(builder.path.getFileName()).toAbsolutePath(),
StandardCopyOption.REPLACE_EXISTING);
}
}
private Path findRoot(Path path) {
if (path == null) {
return null;
}
if (path.getParent().endsWith(Event.INCOMING) ||
path.getParent().endsWith(Event.SUCCESS) ||
path.getParent().endsWith(Event.FAIL)) {
return path.getParent().getParent();
}
return path;
} }
} }
@ -419,14 +454,18 @@ public final class EventManager {
return this; return this;
} }
public EventBuilder setPath(Path path) throws IOException { public EventBuilder setPath(Path path) {
this.path = path; this.path = path;
base = getBase(path); base = getBase(path);
suffix = getSuffix(path); suffix = getSuffix(path);
if (Files.exists(path)) { if (Files.exists(path)) {
fileSize = Files.size(path); try {
if (maxFileSize != -1L && fileSize > maxFileSize) { fileSize = Files.size(path);
throw new IOException("file size too large"); if (maxFileSize != -1L && fileSize > maxFileSize) {
throw new IllegalArgumentException("file size too large");
}
} catch (IOException e) {
throw new IllegalStateException("unable to determine file size");
} }
} else { } else {
fileSize = -1L; fileSize = -1L;
@ -451,7 +490,7 @@ public final class EventManager {
} }
} catch (NoSuchMethodException | InstantiationException | IllegalAccessException | } catch (NoSuchMethodException | InstantiationException | IllegalAccessException |
InvocationTargetException e) { InvocationTargetException e) {
logger.log(Level.WARNING, "unable to construct event object for type " + type + ", falling back to NullEvent"); logger.log(Level.WARNING, "unable to construct event object for type " + type + ", falling back to NullEvent", e);
return new NullEvent(this); return new NullEvent(this);
} }
} }

View file

@ -53,7 +53,8 @@ public class PathEventManagerService implements EventManagerService {
if (definition.getAsBoolean("enabled", true)) { if (definition.getAsBoolean("enabled", true)) {
Path path = Paths.get(definition.get("path", "/var/tmp/" + name)); Path path = Paths.get(definition.get("path", "/var/tmp/" + name));
TimeValue lifetime = definition.getAsTime("lifetime", TimeValue.timeValueHours(72)); TimeValue lifetime = definition.getAsTime("lifetime", TimeValue.timeValueHours(72));
createPathEventService(name, path, lifetime); String eventType = definition.get("type", "path");
createPathEventService(name, path, eventType, lifetime);
} else { } else {
logger.log(Level.WARNING, "path servive definition not enabled in configuration"); logger.log(Level.WARNING, "path servive definition not enabled in configuration");
} }
@ -64,12 +65,9 @@ public class PathEventManagerService implements EventManagerService {
return this; return this;
} }
public void createPathEventService(String name, public void createPathEventService(String name, Path path, String eventType, TimeValue lifetime) throws IOException {
Path path,
TimeValue lifetime)
throws IOException {
createQueue(name, path); createQueue(name, path);
PathEventService pathEventService = new PathEventService(this, eventBus, name, path, lifetime); PathEventService pathEventService = new PathEventService(this, eventBus, name, path, eventType, lifetime);
add(pathEventService); add(pathEventService);
} }
@ -92,6 +90,19 @@ public class PathEventManagerService implements EventManagerService {
}); });
} }
public void destroy() {
logger.log(Level.INFO, "shutting down and destroying all path event files");
eventServiceMap.forEach((k, v) -> {
k.cancel(true);
try {
v.close();
v.destroy();
} catch (IOException e) {
logger.log(Level.SEVERE, e.getMessage(), e);
}
});
}
public List<String> getSuspendedQueues() { public List<String> getSuspendedQueues() {
return suspendedQueues; return suspendedQueues;
} }
@ -163,15 +174,15 @@ public class PathEventManagerService implements EventManagerService {
} }
private static void createQueue(String name, Path p) throws IOException { private static void createQueue(String name, Path p) throws IOException {
logger.log(Level.FINE, "creating queue " + name + " at " + p);
if (!Files.exists(p)) { if (!Files.exists(p)) {
logger.log(Level.FINE, "creating queue " + name + " at " + p);
Files.createDirectories(p); Files.createDirectories(p);
for (String s: List.of(Event.INCOMING, Event.SUCCESS, Event.FAIL)) { }
Path dir = p.resolve(s); for (String s: List.of(Event.INCOMING, Event.SUCCESS, Event.FAIL)) {
if (!Files.exists(dir)) { Path dir = p.resolve(s);
logger.log(Level.FINE, "creating queue " + name + " dir " + dir); if (!Files.exists(dir)) {
Files.createDirectories(dir); logger.log(Level.FINE, "creating queue " + name + " dir " + dir);
} Files.createDirectories(dir);
} }
} }
} }

View file

@ -1,5 +1,7 @@
package org.xbib.event.path; package org.xbib.event.path;
import java.nio.file.FileVisitResult;
import java.nio.file.SimpleFileVisitor;
import org.xbib.datastructures.api.TimeValue; import org.xbib.datastructures.api.TimeValue;
import org.xbib.event.Event; import org.xbib.event.Event;
import org.xbib.event.bus.EventBus; import org.xbib.event.bus.EventBus;
@ -27,7 +29,6 @@ public class PathEventService implements Callable<Integer>, Closeable {
private static final Logger logger = Logger.getLogger(PathEventService.class.getName()); private static final Logger logger = Logger.getLogger(PathEventService.class.getName());
private final PathEventManagerService pathEventManager; private final PathEventManagerService pathEventManager;
private final EventBus eventBus; private final EventBus eventBus;
@ -36,6 +37,8 @@ public class PathEventService implements Callable<Integer>, Closeable {
private final String name; private final String name;
private final String eventType;
private final TimeValue lifetime; private final TimeValue lifetime;
private final WatchService watchService; private final WatchService watchService;
@ -48,25 +51,22 @@ public class PathEventService implements Callable<Integer>, Closeable {
EventBus eventBus, EventBus eventBus,
String name, String name,
Path path, Path path,
String eventType,
TimeValue lifetime) throws IOException { TimeValue lifetime) throws IOException {
this.pathEventManager = pathEventManager; this.pathEventManager = pathEventManager;
this.eventBus = eventBus; this.eventBus = eventBus;
this.name = name; this.name = name;
this.path = path; this.path = path;
this.eventType = eventType;
this.lifetime = lifetime; this.lifetime = lifetime;
drainIncoming();
this.watchService = path.getFileSystem().newWatchService(); this.watchService = path.getFileSystem().newWatchService();
WatchEvent.Kind<?>[] kinds = new WatchEvent.Kind[] { StandardWatchEventKinds.ENTRY_CREATE }; Path incoming = path.resolve(Event.INCOMING);
WatchKey watchKey = path.resolve(Event.INCOMING).register(watchService, kinds); WatchEvent.Kind<?>[] kinds = new WatchEvent.Kind<?>[] { StandardWatchEventKinds.ENTRY_CREATE };
WatchKey watchKey = incoming.register(watchService, kinds);
keepWatching = true; keepWatching = true;
logger.log(Level.INFO, "path event service created for incoming files at " + path); logger.log(Level.INFO, "path event service created for files at " + path);
} }
public String getName() {
return name;
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public Integer call() { public Integer call() {
@ -81,7 +81,7 @@ public class PathEventService implements Callable<Integer>, Closeable {
continue; continue;
} }
// we sleep here a bit, to give time to the OS to complete file writing // we sleep here a bit, to give time to the OS to complete file writing
Thread.sleep(1000L); //Thread.sleep(100L);
WatchEvent<Path> pathWatchEvent = (WatchEvent<Path>) watchEvent; WatchEvent<Path> pathWatchEvent = (WatchEvent<Path>) watchEvent;
String watchEventContext = pathWatchEvent.context().toString(); String watchEventContext = pathWatchEvent.context().toString();
Path p = path.resolve(Event.INCOMING).resolve(watchEventContext); Path p = path.resolve(Event.INCOMING).resolve(watchEventContext);
@ -114,6 +114,10 @@ public class PathEventService implements Callable<Integer>, Closeable {
} }
} }
public void destroy() {
delete(path);
}
public void drainIncoming() throws IOException { public void drainIncoming() throws IOException {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path.resolve(Event.INCOMING))) { try (DirectoryStream<Path> stream = Files.newDirectoryStream(path.resolve(Event.INCOMING))) {
stream.forEach(this::postEvent); stream.forEach(this::postEvent);
@ -124,7 +128,7 @@ public class PathEventService implements Callable<Integer>, Closeable {
purge(path); purge(path);
} }
public void purge(Path path) throws IOException { private void purge(Path path) throws IOException {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path)) { try (DirectoryStream<Path> stream = Files.newDirectoryStream(path)) {
stream.forEach(p -> { stream.forEach(p -> {
try { try {
@ -145,14 +149,10 @@ public class PathEventService implements Callable<Integer>, Closeable {
} }
} }
private void postEvent(Path file) { private void postEvent(Path path) {
try { Event event = EventManager.eventOf(eventType, null, null, path);
Event event = EventManager.eventFromFile(file); eventBus.post(event);
eventBus.post(event); eventCount++;
eventCount++;
} catch (IOException e) {
logger.log(Level.SEVERE, "ignoring event post because of " + e.getMessage());
}
} }
private void failEvent(Path file) { private void failEvent(Path file) {
@ -160,7 +160,37 @@ public class PathEventService implements Callable<Integer>, Closeable {
Event event = EventManager.eventFromFile(file); Event event = EventManager.eventFromFile(file);
event.fail(); event.fail();
} catch (IOException e) { } catch (IOException e) {
logger.log(Level.SEVERE, "unable to fail event because of " + e.getMessage()); logger.log(Level.SEVERE, "unable to fail event because of " + e.getMessage(), e);
}
}
private static void delete(Path path) {
if (path == null) {
return;
}
if (!Files.exists(path)) {
return;
}
// delete sub trees
try {
Files.walkFileTree(path, new SimpleFileVisitor<>() {
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
Files.delete(file);
return FileVisitResult.CONTINUE;
}
@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
Files.delete(dir);
return FileVisitResult.CONTINUE;
}
});
// and finally, delete the path
Files.deleteIfExists(path);
logger.log(Level.FINE, "deleted " + path);
} catch (IOException e) {
throw new UncheckedIOException(e);
} }
} }
} }

View file

@ -1,7 +1,9 @@
package org.xbib.event.path; package org.xbib.event.path;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.xbib.event.PathEvent;
import org.xbib.event.common.EventManager; import org.xbib.event.common.EventManager;
import org.xbib.event.common.PathEventImpl;
import org.xbib.settings.Settings; import org.xbib.settings.Settings;
import java.io.BufferedWriter; import java.io.BufferedWriter;
@ -27,15 +29,43 @@ public class PathEventManagerTest {
.setSettings(settings) .setSettings(settings)
.register(consumer) .register(consumer)
.build(); .build();
Thread.sleep(1000L);
Path testTxt = path.resolve("incoming").resolve("test.txt"); Path testTxt = path.resolve("incoming").resolve("test.txt");
try (BufferedWriter bufferedWriter = Files.newBufferedWriter(testTxt)) { try (BufferedWriter bufferedWriter = Files.newBufferedWriter(testTxt)) {
bufferedWriter.write("Hello"); bufferedWriter.write("Hello");
logger.log(Level.INFO, "Hello written");
} }
Thread.sleep(1000L); Thread.sleep(2000L);
Files.delete(testTxt);
Files.delete(path);
eventManager.close(); eventManager.close();
eventManager.getPathEventManagerService().destroy();
}
@Test
public void testExtendedPathEvent() throws Exception {
Path path = Files.createTempDirectory("testpath");
TestPathEventConsumer consumer = new TestPathEventConsumer();
Settings settings = Settings.settingsBuilder()
.put("event.path.testpathevent.enabled", "true")
.put("event.path.testpathevent.path", path.toString())
.put("event.path.testpathevent.type", "path-ext")
.build();
EventManager eventManager = EventManager.builder()
.setSettings(settings)
.register("path-ext", PathExtEvent.class)
.register(consumer)
.build();
Path testTxt = path.resolve("incoming").resolve("test.txt");
try (BufferedWriter bufferedWriter = Files.newBufferedWriter(testTxt)) {
bufferedWriter.write("Hello");
}
Thread.sleep(2000L);
eventManager.close();
eventManager.getPathEventManagerService().destroy();
}
public static class PathExtEvent extends PathEventImpl implements PathEvent {
public PathExtEvent(EventManager.EventBuilder builder) {
super(builder);
logger.log(Level.INFO, "I'm the path ext event");
}
} }
} }

View file

@ -15,6 +15,6 @@ public class TestPathEventConsumer implements EventConsumer {
@Subscribe @Subscribe
@AllowConcurrentEvents @AllowConcurrentEvents
void onEvent(PathEvent event) { void onEvent(PathEvent event) {
logger.log(Level.INFO, "received path event, path = " + event.getPath() + " content = " + event.getMessage()); logger.log(Level.INFO, "received path event, path = " + event.getPath());
} }
} }