From 65d6f33edb3b41f11f38e78e656fca5fa2cdc987 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Prante?= Date: Wed, 31 Jan 2024 18:58:59 +0100 Subject: [PATCH] working on path events --- .../org/xbib/event/common/EventManager.java | 11 ++-- .../event/path/PathEventManagerService.java | 57 ++++++++----------- .../generic/GenericEventManagerTest.java | 10 +++- .../path/FileFollowEventManagerTest.java | 9 +-- .../xbib/event/path/PathEventManagerTest.java | 41 +++++++++++++ .../event/path/TestPathEventConsumer.java | 20 +++++++ 6 files changed, 104 insertions(+), 44 deletions(-) create mode 100644 event-common/src/test/java/org/xbib/event/path/PathEventManagerTest.java create mode 100644 event-common/src/test/java/org/xbib/event/path/TestPathEventConsumer.java diff --git a/event-common/src/main/java/org/xbib/event/common/EventManager.java b/event-common/src/main/java/org/xbib/event/common/EventManager.java index 30e7a79..7e66a2a 100644 --- a/event-common/src/main/java/org/xbib/event/common/EventManager.java +++ b/event-common/src/main/java/org/xbib/event/common/EventManager.java @@ -13,7 +13,6 @@ import java.util.ServiceLoader; import org.xbib.datastructures.json.tiny.Json; import org.xbib.event.Event; import org.xbib.event.EventConsumer; -import org.xbib.event.FileFollowEvent; import org.xbib.event.Listener; import org.xbib.event.Payload; import org.xbib.event.bus.AsyncEventBus; @@ -424,9 +423,13 @@ public final class EventManager { this.path = path; base = getBase(path); suffix = getSuffix(path); - fileSize = Files.size(path); - if (maxFileSize != -1L && fileSize > maxFileSize) { - throw new IOException("file size too large"); + if (Files.exists(path)) { + fileSize = Files.size(path); + if (maxFileSize != -1L && fileSize > maxFileSize) { + throw new IOException("file size too large"); + } + } else { + fileSize = -1L; } return this; } diff --git a/event-common/src/main/java/org/xbib/event/path/PathEventManagerService.java b/event-common/src/main/java/org/xbib/event/path/PathEventManagerService.java index ca11d58..2b2df7b 100644 --- a/event-common/src/main/java/org/xbib/event/path/PathEventManagerService.java +++ b/event-common/src/main/java/org/xbib/event/path/PathEventManagerService.java @@ -32,8 +32,6 @@ public class PathEventManagerService implements EventManagerService { private ExecutorService executorService; - private Path path; - private Map, PathEventService> eventServiceMap; private List suspendedQueues; @@ -41,24 +39,21 @@ public class PathEventManagerService implements EventManagerService { public PathEventManagerService() { } - @SuppressWarnings("unchecked") @Override public PathEventManagerService init(EventManager eventManager) { Settings settings = eventManager.getSettings(); - ClassLoader classLoader = eventManager.getClassLoader(); this.eventBus = eventManager.getEventBus(); this.executorService = eventManager.getExecutorService(); this.eventServiceMap = new LinkedHashMap<>(); this.suspendedQueues = new ArrayList<>(); - this.path = Paths.get(settings.get("event.pathevent.base", "/var/tmp/pathevent")); for (Map.Entry entry : settings.getGroups("event.path").entrySet()) { try { String name = entry.getKey(); Settings definition = entry.getValue(); if (definition.getAsBoolean("enabled", true)) { + Path path = Paths.get(definition.get("path", "/var/tmp/" + name)); TimeValue lifetime = definition.getAsTime("lifetime", TimeValue.timeValueHours(72)); - Path p = path.resolve(name); - createPathEventService(name, p, lifetime); + createPathEventService(name, path, lifetime); } else { logger.log(Level.WARNING, "path servive definition not enabled in configuration"); } @@ -109,60 +104,56 @@ public class PathEventManagerService implements EventManagerService { suspendedQueues.remove(queue); } - public boolean put(String queue, String key, Map map) throws IOException { - return put(queue, key, ".json", Json.toString(map)); + public boolean put(Path path, String key, Map map) throws IOException { + return put(path, key, ".json", Json.toString(map)); } - public boolean putIfNotExists(String queue, String key, Map map) throws IOException { - if (!exists(queue, key, ".json")) { - return put(queue, key, ".json", Json.toString(map)); + public boolean putIfNotExists(Path path, String key, Map map) throws IOException { + if (!exists(path, key, ".json")) { + return put(path, key, ".json", Json.toString(map)); } else { return false; } } - public boolean put(String queue, String key, String suffix, String string) throws IOException { + public boolean put(Path path, String key, String suffix, String string) throws IOException { String keyFileName = key + suffix; - Path queuePath = path.resolve(queue); - if (Files.exists(queuePath.resolve(Event.INCOMING).resolve(keyFileName)) || - Files.exists(queuePath.resolve(Event.SUCCESS).resolve(keyFileName))) { + if (Files.exists(path.resolve(Event.INCOMING).resolve(keyFileName)) || + Files.exists(path.resolve(Event.SUCCESS).resolve(keyFileName))) { logger.log(Level.WARNING, "key " + key + " already exists"); return false; } - Path p = queuePath.resolve(Event.INCOMING).resolve(keyFileName); + Path p = path.resolve(Event.INCOMING).resolve(keyFileName); try (Writer writer = Files.newBufferedWriter(p)) { writer.write(string); } // obligatory purge. This is hacky. eventServiceMap.forEach((k, v) -> { - if (v.getName().equals(queue)) { - try { - v.purge(); - } catch (IOException e) { - throw new UncheckedIOException(e); - } + try { + v.purge(); + } catch (IOException e) { + throw new UncheckedIOException(e); } }); return true; } - public boolean exists(String queue, String key, String suffix) { + public boolean exists(Path path, String key, String suffix) { String keyFileName = key + suffix; - Path queuePath = path.resolve(queue); - return Files.exists(queuePath.resolve(Event.INCOMING).resolve(keyFileName)) || - Files.exists(queuePath.resolve(Event.SUCCESS).resolve(keyFileName)); + return Files.exists(path.resolve(Event.INCOMING).resolve(keyFileName)) || + Files.exists(path.resolve(Event.SUCCESS).resolve(keyFileName)); } - public long sizeOfIncoming(String queue) throws IOException { - return sizeOf(path.resolve(queue).resolve(Event.INCOMING)); + public long sizeOfIncoming(Path path) throws IOException { + return sizeOf(path.resolve(Event.INCOMING)); } - public long sizeOfSuccess(String queue) throws IOException { - return sizeOf(path.resolve(queue).resolve(Event.SUCCESS)); + public long sizeOfSuccess(Path path) throws IOException { + return sizeOf(path.resolve(Event.SUCCESS)); } - public long sizeOfFail(String queue) throws IOException { - return sizeOf(path.resolve(queue).resolve(Event.FAIL)); + public long sizeOfFail(Path path) throws IOException { + return sizeOf(path.resolve(Event.FAIL)); } public static long sizeOf(Path path) throws IOException { diff --git a/event-common/src/test/java/org/xbib/event/generic/GenericEventManagerTest.java b/event-common/src/test/java/org/xbib/event/generic/GenericEventManagerTest.java index e9d5e9b..6dbcfe5 100644 --- a/event-common/src/test/java/org/xbib/event/generic/GenericEventManagerTest.java +++ b/event-common/src/test/java/org/xbib/event/generic/GenericEventManagerTest.java @@ -16,13 +16,14 @@ import java.util.logging.Level; import java.util.logging.Logger; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; public class GenericEventManagerTest { private static final Logger logger = Logger.getLogger(GenericEventManagerTest.class.getName()); @Test - void testSimpleGenericEvent() { + void testSimpleGenericEvent() throws InterruptedException { TestEventConsumer consumer = new TestEventConsumer(); EventManager eventManager = EventManager.builder() .register(consumer) @@ -32,10 +33,11 @@ public class GenericEventManagerTest { .setListener(e -> logger.log(Level.INFO, "received event " + e)) .build(); eventManager.getGenericEventManagerService().post(event); + // we must wait for a certain time because we do not use a future + Thread.sleep(500L); assertEquals(1, consumer.getCount()); } - @Test void testGenericEventWithWaitForSubscriber() throws InterruptedException, ExecutionException, TimeoutException { TestEventConsumer consumer = new TestEventConsumer(); @@ -52,6 +54,7 @@ public class GenericEventManagerTest { .build(); eventManager.getGenericEventManagerService().post(event); Event e = future.get(1000L, TimeUnit.MILLISECONDS); + assertNotNull(e); } @Test @@ -76,8 +79,9 @@ public class GenericEventManagerTest { eventManager.getGenericEventManagerService().post((GenericEventImpl) event, future); Event e = future.get(5L, TimeUnit.SECONDS); if (e != null) { - logger.log(Level.INFO, "the event " + e + " was received by all consumers, continuing"); + logger.log(Level.INFO, "the event " + e + " was received by all consumers"); } + assertNotNull(e); } private static class TestEventConsumer implements EventConsumer { diff --git a/event-common/src/test/java/org/xbib/event/path/FileFollowEventManagerTest.java b/event-common/src/test/java/org/xbib/event/path/FileFollowEventManagerTest.java index 6cc51e1..22e7441 100644 --- a/event-common/src/test/java/org/xbib/event/path/FileFollowEventManagerTest.java +++ b/event-common/src/test/java/org/xbib/event/path/FileFollowEventManagerTest.java @@ -28,13 +28,14 @@ public class FileFollowEventManagerTest { .setSettings(settings) .register(consumer) .build(); - Thread.sleep(5000L); - try (BufferedWriter bufferedWriter = Files.newBufferedWriter(path.resolve("test.txt"))) { + Thread.sleep(1000L); + Path testTxt = path.resolve("test.txt"); + try (BufferedWriter bufferedWriter = Files.newBufferedWriter(testTxt)) { bufferedWriter.write("Hello"); logger.log(Level.INFO, "Hello written"); } - Thread.sleep(5000L); - Files.delete(path.resolve("test.txt")); + Thread.sleep(1000L); + Files.delete(testTxt); Files.delete(path); eventManager.close(); } diff --git a/event-common/src/test/java/org/xbib/event/path/PathEventManagerTest.java b/event-common/src/test/java/org/xbib/event/path/PathEventManagerTest.java new file mode 100644 index 0000000..504fff3 --- /dev/null +++ b/event-common/src/test/java/org/xbib/event/path/PathEventManagerTest.java @@ -0,0 +1,41 @@ +package org.xbib.event.path; + +import org.junit.jupiter.api.Test; +import org.xbib.event.common.EventManager; +import org.xbib.settings.Settings; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class PathEventManagerTest { + + private static final Logger logger = Logger.getLogger(PathEventManagerTest.class.getName()); + + @Test + public void testPathEvents() throws IOException, InterruptedException { + Path path = Files.createTempDirectory("testpath"); + TestPathEventConsumer consumer = new TestPathEventConsumer(); + Settings settings = Settings.settingsBuilder() + .put("event.path.testpathevent.enabled", "true") + .put("event.path.testpathevent.path", path.toString()) + .build(); + EventManager eventManager = EventManager.builder() + .setSettings(settings) + .register(consumer) + .build(); + Thread.sleep(1000L); + Path testTxt = path.resolve("incoming").resolve("test.txt"); + try (BufferedWriter bufferedWriter = Files.newBufferedWriter(testTxt)) { + bufferedWriter.write("Hello"); + logger.log(Level.INFO, "Hello written"); + } + Thread.sleep(1000L); + Files.delete(testTxt); + Files.delete(path); + eventManager.close(); + } +} diff --git a/event-common/src/test/java/org/xbib/event/path/TestPathEventConsumer.java b/event-common/src/test/java/org/xbib/event/path/TestPathEventConsumer.java new file mode 100644 index 0000000..68d5808 --- /dev/null +++ b/event-common/src/test/java/org/xbib/event/path/TestPathEventConsumer.java @@ -0,0 +1,20 @@ +package org.xbib.event.path; + +import org.xbib.event.EventConsumer; +import org.xbib.event.PathEvent; +import org.xbib.event.bus.AllowConcurrentEvents; +import org.xbib.event.bus.Subscribe; + +import java.util.logging.Level; +import java.util.logging.Logger; + +public class TestPathEventConsumer implements EventConsumer { + + private static final Logger logger = Logger.getLogger(TestPathEventConsumer.class.getName()); + + @Subscribe + @AllowConcurrentEvents + void onEvent(PathEvent event) { + logger.log(Level.INFO, "received path event, path = " + event.getPath() + " content = " + event.getMessage()); + } +}