From 788ce1b7d7859d7211f2b7bcb38271da5c464561 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Prante?= Date: Thu, 1 Feb 2024 16:51:08 +0100 Subject: [PATCH] working on path events --- .../src/main/java/org/xbib/event/Payload.java | 1 + .../org/xbib/event/common/EventManager.java | 67 +++++++++++++---- .../event/path/PathEventManagerService.java | 37 ++++++---- .../org/xbib/event/path/PathEventService.java | 72 +++++++++++++------ .../xbib/event/path/PathEventManagerTest.java | 40 +++++++++-- .../event/path/TestPathEventConsumer.java | 2 +- 6 files changed, 165 insertions(+), 54 deletions(-) diff --git a/event-api/src/main/java/org/xbib/event/Payload.java b/event-api/src/main/java/org/xbib/event/Payload.java index 85c135a..de6d3b4 100644 --- a/event-api/src/main/java/org/xbib/event/Payload.java +++ b/event-api/src/main/java/org/xbib/event/Payload.java @@ -3,6 +3,7 @@ package org.xbib.event; import java.util.LinkedHashMap; import java.util.Map; +@SuppressWarnings("serial") public class Payload extends LinkedHashMap { public Payload() { diff --git a/event-common/src/main/java/org/xbib/event/common/EventManager.java b/event-common/src/main/java/org/xbib/event/common/EventManager.java index 7e66a2a..7343f1d 100644 --- a/event-common/src/main/java/org/xbib/event/common/EventManager.java +++ b/event-common/src/main/java/org/xbib/event/common/EventManager.java @@ -244,6 +244,26 @@ public final class EventManager { } } + public static Event eventOf(String eventType, + String code, + String message, + Path path) { + return eventBuilder() + .setType(eventType) + .setCode(code) + .setMessage(message) + .setPath(path) + .build(); + } + + public static Event eventOf(String eventType, + Instant scheduled) { + return eventBuilder() + .setType(eventType) + .setScheduledFor(scheduled) + .build(); + } + public static Event eventFromFile(Path file) throws IOException { return eventFromJson(Files.readString(file)); } @@ -333,20 +353,35 @@ public final class EventManager { } public void success() throws IOException { - if (builder.path != null) { - Files.setLastModifiedTime(builder.path, FileTime.from(Instant.now())); - Files.move(builder.path, builder.path.getParent().resolve(Event.SUCCESS) - .resolve(builder.path.getFileName()).toAbsolutePath(), + Path path = builder.path; + if (path != null) { + Path root = findRoot(path); + Files.setLastModifiedTime(path, FileTime.from(Instant.now())); + Files.move(path, root.resolve(Event.SUCCESS).resolve(builder.path.getFileName()).toAbsolutePath(), StandardCopyOption.REPLACE_EXISTING); - } } public void fail() throws IOException { - Files.setLastModifiedTime(builder.path, FileTime.from(Instant.now())); - Files.move(builder.path, builder.path.getParent().resolve(Event.FAIL) - .resolve(builder.path.getFileName()).toAbsolutePath(), - StandardCopyOption.REPLACE_EXISTING); + Path path = builder.path; + if (path != null) { + Path root = findRoot(path); + Files.setLastModifiedTime(path, FileTime.from(Instant.now())); + Files.move(path, root.resolve(Event.FAIL).resolve(builder.path.getFileName()).toAbsolutePath(), + StandardCopyOption.REPLACE_EXISTING); + } + } + + private Path findRoot(Path path) { + if (path == null) { + return null; + } + if (path.getParent().endsWith(Event.INCOMING) || + path.getParent().endsWith(Event.SUCCESS) || + path.getParent().endsWith(Event.FAIL)) { + return path.getParent().getParent(); + } + return path; } } @@ -419,14 +454,18 @@ public final class EventManager { return this; } - public EventBuilder setPath(Path path) throws IOException { + public EventBuilder setPath(Path path) { this.path = path; base = getBase(path); suffix = getSuffix(path); if (Files.exists(path)) { - fileSize = Files.size(path); - if (maxFileSize != -1L && fileSize > maxFileSize) { - throw new IOException("file size too large"); + try { + fileSize = Files.size(path); + if (maxFileSize != -1L && fileSize > maxFileSize) { + throw new IllegalArgumentException("file size too large"); + } + } catch (IOException e) { + throw new IllegalStateException("unable to determine file size"); } } else { fileSize = -1L; @@ -451,7 +490,7 @@ public final class EventManager { } } catch (NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) { - logger.log(Level.WARNING, "unable to construct event object for type " + type + ", falling back to NullEvent"); + logger.log(Level.WARNING, "unable to construct event object for type " + type + ", falling back to NullEvent", e); return new NullEvent(this); } } diff --git a/event-common/src/main/java/org/xbib/event/path/PathEventManagerService.java b/event-common/src/main/java/org/xbib/event/path/PathEventManagerService.java index 2b2df7b..27c1ed3 100644 --- a/event-common/src/main/java/org/xbib/event/path/PathEventManagerService.java +++ b/event-common/src/main/java/org/xbib/event/path/PathEventManagerService.java @@ -53,7 +53,8 @@ public class PathEventManagerService implements EventManagerService { if (definition.getAsBoolean("enabled", true)) { Path path = Paths.get(definition.get("path", "/var/tmp/" + name)); TimeValue lifetime = definition.getAsTime("lifetime", TimeValue.timeValueHours(72)); - createPathEventService(name, path, lifetime); + String eventType = definition.get("type", "path"); + createPathEventService(name, path, eventType, lifetime); } else { logger.log(Level.WARNING, "path servive definition not enabled in configuration"); } @@ -64,12 +65,9 @@ public class PathEventManagerService implements EventManagerService { return this; } - public void createPathEventService(String name, - Path path, - TimeValue lifetime) - throws IOException { + public void createPathEventService(String name, Path path, String eventType, TimeValue lifetime) throws IOException { createQueue(name, path); - PathEventService pathEventService = new PathEventService(this, eventBus, name, path, lifetime); + PathEventService pathEventService = new PathEventService(this, eventBus, name, path, eventType, lifetime); add(pathEventService); } @@ -92,6 +90,19 @@ public class PathEventManagerService implements EventManagerService { }); } + public void destroy() { + logger.log(Level.INFO, "shutting down and destroying all path event files"); + eventServiceMap.forEach((k, v) -> { + k.cancel(true); + try { + v.close(); + v.destroy(); + } catch (IOException e) { + logger.log(Level.SEVERE, e.getMessage(), e); + } + }); + } + public List getSuspendedQueues() { return suspendedQueues; } @@ -163,15 +174,15 @@ public class PathEventManagerService implements EventManagerService { } private static void createQueue(String name, Path p) throws IOException { + logger.log(Level.FINE, "creating queue " + name + " at " + p); if (!Files.exists(p)) { - logger.log(Level.FINE, "creating queue " + name + " at " + p); Files.createDirectories(p); - for (String s: List.of(Event.INCOMING, Event.SUCCESS, Event.FAIL)) { - Path dir = p.resolve(s); - if (!Files.exists(dir)) { - logger.log(Level.FINE, "creating queue " + name + " dir " + dir); - Files.createDirectories(dir); - } + } + for (String s: List.of(Event.INCOMING, Event.SUCCESS, Event.FAIL)) { + Path dir = p.resolve(s); + if (!Files.exists(dir)) { + logger.log(Level.FINE, "creating queue " + name + " dir " + dir); + Files.createDirectories(dir); } } } diff --git a/event-common/src/main/java/org/xbib/event/path/PathEventService.java b/event-common/src/main/java/org/xbib/event/path/PathEventService.java index 00528ed..02397eb 100644 --- a/event-common/src/main/java/org/xbib/event/path/PathEventService.java +++ b/event-common/src/main/java/org/xbib/event/path/PathEventService.java @@ -1,5 +1,7 @@ package org.xbib.event.path; +import java.nio.file.FileVisitResult; +import java.nio.file.SimpleFileVisitor; import org.xbib.datastructures.api.TimeValue; import org.xbib.event.Event; import org.xbib.event.bus.EventBus; @@ -27,7 +29,6 @@ public class PathEventService implements Callable, Closeable { private static final Logger logger = Logger.getLogger(PathEventService.class.getName()); - private final PathEventManagerService pathEventManager; private final EventBus eventBus; @@ -36,6 +37,8 @@ public class PathEventService implements Callable, Closeable { private final String name; + private final String eventType; + private final TimeValue lifetime; private final WatchService watchService; @@ -48,25 +51,22 @@ public class PathEventService implements Callable, Closeable { EventBus eventBus, String name, Path path, + String eventType, TimeValue lifetime) throws IOException { this.pathEventManager = pathEventManager; this.eventBus = eventBus; this.name = name; this.path = path; + this.eventType = eventType; this.lifetime = lifetime; - drainIncoming(); this.watchService = path.getFileSystem().newWatchService(); - WatchEvent.Kind[] kinds = new WatchEvent.Kind[] { StandardWatchEventKinds.ENTRY_CREATE }; - WatchKey watchKey = path.resolve(Event.INCOMING).register(watchService, kinds); + Path incoming = path.resolve(Event.INCOMING); + WatchEvent.Kind[] kinds = new WatchEvent.Kind[] { StandardWatchEventKinds.ENTRY_CREATE }; + WatchKey watchKey = incoming.register(watchService, kinds); keepWatching = true; - logger.log(Level.INFO, "path event service created for incoming files at " + path); + logger.log(Level.INFO, "path event service created for files at " + path); } - public String getName() { - return name; - } - - @SuppressWarnings("unchecked") @Override public Integer call() { @@ -81,7 +81,7 @@ public class PathEventService implements Callable, Closeable { continue; } // we sleep here a bit, to give time to the OS to complete file writing - Thread.sleep(1000L); + //Thread.sleep(100L); WatchEvent pathWatchEvent = (WatchEvent) watchEvent; String watchEventContext = pathWatchEvent.context().toString(); Path p = path.resolve(Event.INCOMING).resolve(watchEventContext); @@ -114,6 +114,10 @@ public class PathEventService implements Callable, Closeable { } } + public void destroy() { + delete(path); + } + public void drainIncoming() throws IOException { try (DirectoryStream stream = Files.newDirectoryStream(path.resolve(Event.INCOMING))) { stream.forEach(this::postEvent); @@ -124,7 +128,7 @@ public class PathEventService implements Callable, Closeable { purge(path); } - public void purge(Path path) throws IOException { + private void purge(Path path) throws IOException { try (DirectoryStream stream = Files.newDirectoryStream(path)) { stream.forEach(p -> { try { @@ -145,14 +149,10 @@ public class PathEventService implements Callable, Closeable { } } - private void postEvent(Path file) { - try { - Event event = EventManager.eventFromFile(file); - eventBus.post(event); - eventCount++; - } catch (IOException e) { - logger.log(Level.SEVERE, "ignoring event post because of " + e.getMessage()); - } + private void postEvent(Path path) { + Event event = EventManager.eventOf(eventType, null, null, path); + eventBus.post(event); + eventCount++; } private void failEvent(Path file) { @@ -160,7 +160,37 @@ public class PathEventService implements Callable, Closeable { Event event = EventManager.eventFromFile(file); event.fail(); } catch (IOException e) { - logger.log(Level.SEVERE, "unable to fail event because of " + e.getMessage()); + logger.log(Level.SEVERE, "unable to fail event because of " + e.getMessage(), e); + } + } + + private static void delete(Path path) { + if (path == null) { + return; + } + if (!Files.exists(path)) { + return; + } + // delete sub trees + try { + Files.walkFileTree(path, new SimpleFileVisitor<>() { + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException { + Files.delete(file); + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException { + Files.delete(dir); + return FileVisitResult.CONTINUE; + } + }); + // and finally, delete the path + Files.deleteIfExists(path); + logger.log(Level.FINE, "deleted " + path); + } catch (IOException e) { + throw new UncheckedIOException(e); } } } diff --git a/event-common/src/test/java/org/xbib/event/path/PathEventManagerTest.java b/event-common/src/test/java/org/xbib/event/path/PathEventManagerTest.java index 504fff3..d2fcc25 100644 --- a/event-common/src/test/java/org/xbib/event/path/PathEventManagerTest.java +++ b/event-common/src/test/java/org/xbib/event/path/PathEventManagerTest.java @@ -1,7 +1,9 @@ package org.xbib.event.path; import org.junit.jupiter.api.Test; +import org.xbib.event.PathEvent; import org.xbib.event.common.EventManager; +import org.xbib.event.common.PathEventImpl; import org.xbib.settings.Settings; import java.io.BufferedWriter; @@ -27,15 +29,43 @@ public class PathEventManagerTest { .setSettings(settings) .register(consumer) .build(); - Thread.sleep(1000L); Path testTxt = path.resolve("incoming").resolve("test.txt"); try (BufferedWriter bufferedWriter = Files.newBufferedWriter(testTxt)) { bufferedWriter.write("Hello"); - logger.log(Level.INFO, "Hello written"); } - Thread.sleep(1000L); - Files.delete(testTxt); - Files.delete(path); + Thread.sleep(2000L); eventManager.close(); + eventManager.getPathEventManagerService().destroy(); + } + + @Test + public void testExtendedPathEvent() throws Exception { + Path path = Files.createTempDirectory("testpath"); + TestPathEventConsumer consumer = new TestPathEventConsumer(); + Settings settings = Settings.settingsBuilder() + .put("event.path.testpathevent.enabled", "true") + .put("event.path.testpathevent.path", path.toString()) + .put("event.path.testpathevent.type", "path-ext") + .build(); + EventManager eventManager = EventManager.builder() + .setSettings(settings) + .register("path-ext", PathExtEvent.class) + .register(consumer) + .build(); + Path testTxt = path.resolve("incoming").resolve("test.txt"); + try (BufferedWriter bufferedWriter = Files.newBufferedWriter(testTxt)) { + bufferedWriter.write("Hello"); + } + Thread.sleep(2000L); + eventManager.close(); + eventManager.getPathEventManagerService().destroy(); + } + + public static class PathExtEvent extends PathEventImpl implements PathEvent { + + public PathExtEvent(EventManager.EventBuilder builder) { + super(builder); + logger.log(Level.INFO, "I'm the path ext event"); + } } } diff --git a/event-common/src/test/java/org/xbib/event/path/TestPathEventConsumer.java b/event-common/src/test/java/org/xbib/event/path/TestPathEventConsumer.java index 68d5808..71e0284 100644 --- a/event-common/src/test/java/org/xbib/event/path/TestPathEventConsumer.java +++ b/event-common/src/test/java/org/xbib/event/path/TestPathEventConsumer.java @@ -15,6 +15,6 @@ public class TestPathEventConsumer implements EventConsumer { @Subscribe @AllowConcurrentEvents void onEvent(PathEvent event) { - logger.log(Level.INFO, "received path event, path = " + event.getPath() + " content = " + event.getMessage()); + logger.log(Level.INFO, "received path event, path = " + event.getPath()); } }