From 757e62040be8f5a644a30c1a4a39e0bbcb952199 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Prante?= Date: Mon, 5 Feb 2024 17:53:40 +0100 Subject: [PATCH] json format of event --- event-common/src/main/java/module-info.java | 1 - .../event/clock/ClockEventManagerService.java | 33 +++---- .../xbib/event/clock/ClockEventService.java | 25 +++--- .../common/AbstractEventManagerService.java | 25 ++++++ .../org/xbib/event/common/EventManager.java | 85 +++++++++++++++---- .../generic/GenericEventManagerService.java | 77 ----------------- .../path/FileFollowEventManagerService.java | 20 +++-- .../event/path/FileFollowEventService.java | 31 ++++--- .../event/path/PathEventManagerService.java | 64 +++++++------- .../org/xbib/event/path/PathEventService.java | 37 ++------ .../event/timer/TimerEventManagerService.java | 44 ++++++---- .../xbib/event/timer/TimerEventService.java | 20 +++-- .../event/clock/ClockEventManagerTest.java | 2 +- .../GenericEventManagerTest.java | 10 +-- .../path/FileFollowEventManagerTest.java | 2 +- .../xbib/event/path/PathEventManagerTest.java | 6 +- .../event/timer/TimerEventManagerTest.java | 2 +- .../net/http/HttpEventReceiverService.java | 4 +- 18 files changed, 239 insertions(+), 249 deletions(-) create mode 100644 event-common/src/main/java/org/xbib/event/common/AbstractEventManagerService.java delete mode 100644 event-common/src/main/java/org/xbib/event/generic/GenericEventManagerService.java rename event-common/src/test/java/org/xbib/event/{generic => common}/GenericEventManagerTest.java (90%) diff --git a/event-common/src/main/java/module-info.java b/event-common/src/main/java/module-info.java index 6eb55a4..7826d75 100644 --- a/event-common/src/main/java/module-info.java +++ b/event-common/src/main/java/module-info.java @@ -4,7 +4,6 @@ module org.xbib.event.common { exports org.xbib.event.bus; exports org.xbib.event.clock; exports org.xbib.event.common; - exports org.xbib.event.generic; exports org.xbib.event.log; exports org.xbib.event.path; exports org.xbib.event.persistence; diff --git a/event-common/src/main/java/org/xbib/event/clock/ClockEventManagerService.java b/event-common/src/main/java/org/xbib/event/clock/ClockEventManagerService.java index 631b2c4..cd43433 100644 --- a/event-common/src/main/java/org/xbib/event/clock/ClockEventManagerService.java +++ b/event-common/src/main/java/org/xbib/event/clock/ClockEventManagerService.java @@ -1,6 +1,7 @@ package org.xbib.event.clock; -import org.xbib.event.bus.EventBus; +import org.xbib.event.Event; +import org.xbib.event.common.AbstractEventManagerService; import org.xbib.event.common.EventManager; import org.xbib.event.common.EventManagerService; import org.xbib.settings.Settings; @@ -8,8 +9,6 @@ import org.xbib.time.schedule.CronExpression; import org.xbib.time.schedule.CronSchedule; import java.io.IOException; -import java.util.ArrayList; -import java.util.List; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -17,21 +16,21 @@ import java.util.concurrent.ThreadFactory; import java.util.logging.Level; import java.util.logging.Logger; -public class ClockEventManagerService implements EventManagerService { +public class ClockEventManagerService extends AbstractEventManagerService implements EventManagerService { private static final Logger logger = Logger.getLogger(ClockEventManagerService.class.getName()); - private CronSchedule cronSchedule; + private EventManager eventManager; - private List suspended; + private CronSchedule cronSchedule; public ClockEventManagerService() { + super(); } public ClockEventManagerService init(EventManager eventManager) { + this.eventManager = eventManager; Settings settings = eventManager.getSettings(); - EventBus eventBus = eventManager.getEventBus(); - this.suspended = new ArrayList<>(); ThreadFactory threadFactory = new ThreadFactory() { int n = 1; @Override @@ -49,7 +48,7 @@ public class ClockEventManagerService implements EventManagerService { String entry = entrySettings.get("entry"); if (entry != null) { try { - ClockEventService clockEventService = new ClockEventService(this, eventBus, name); + ClockEventService clockEventService = new ClockEventService(this, name); cronSchedule.add(name, CronExpression.parse(entry), clockEventService); logger.log(Level.INFO, "cron job " + name + " scheduled on " + entry); } catch (Exception e) { @@ -65,20 +64,12 @@ public class ClockEventManagerService implements EventManagerService { return this; } - public List getSuspended() { - return suspended; - } - - public void suspend(String name) { - suspended.add(name); - } - - public void resume(String name) { - suspended.remove(name); - } - @Override public void shutdown() throws IOException { cronSchedule.close(); } + + public void publish(Event event) { + eventManager.publish(event); + } } diff --git a/event-common/src/main/java/org/xbib/event/clock/ClockEventService.java b/event-common/src/main/java/org/xbib/event/clock/ClockEventService.java index 836d214..735357c 100644 --- a/event-common/src/main/java/org/xbib/event/clock/ClockEventService.java +++ b/event-common/src/main/java/org/xbib/event/clock/ClockEventService.java @@ -5,43 +5,38 @@ import java.util.logging.Level; import java.util.logging.Logger; import org.xbib.event.Event; -import org.xbib.event.bus.EventBus; import org.xbib.event.common.EventManager; -public class ClockEventService implements Callable { +public class ClockEventService implements Callable { private static final Logger logger = Logger.getLogger(ClockEventService.class.getName()); - private final ClockEventManagerService manager; - - private final EventBus eventBus; + private final ClockEventManagerService clockEventManagerService; private final String name; - public ClockEventService(ClockEventManagerService manager, - EventBus eventBus, + public ClockEventService(ClockEventManagerService clockEventManagerService, String name) { - this.manager = manager; - this.eventBus = eventBus; + this.clockEventManagerService = clockEventManagerService; this.name = name; } @Override - public Integer call() { + public Boolean call() { try { - if (manager.getSuspended().contains(name)) { + if (clockEventManagerService.getSuspended().contains(name)) { logger.log(Level.FINE, "clock event " + name + " suspended"); - return 1; + return false; } else { Event clockEvent = EventManager.eventBuilder() .setType("clock") .build(); - eventBus.post(clockEvent); - return 0; + clockEventManagerService.publish(clockEvent); + return true; } } catch (Throwable t) { logger.log(Level.WARNING, t.getMessage(), t); - return 1; + return false; } } } diff --git a/event-common/src/main/java/org/xbib/event/common/AbstractEventManagerService.java b/event-common/src/main/java/org/xbib/event/common/AbstractEventManagerService.java new file mode 100644 index 0000000..7d0f0da --- /dev/null +++ b/event-common/src/main/java/org/xbib/event/common/AbstractEventManagerService.java @@ -0,0 +1,25 @@ +package org.xbib.event.common; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +public abstract class AbstractEventManagerService { + + private List suspended; + + public AbstractEventManagerService() { + this.suspended = new CopyOnWriteArrayList<>(); + } + + public void suspend(String name) { + suspended.add(name); + } + + public void resume(String name) { + suspended.remove(name); + } + + public List getSuspended() { + return suspended; + } +} diff --git a/event-common/src/main/java/org/xbib/event/common/EventManager.java b/event-common/src/main/java/org/xbib/event/common/EventManager.java index 7343f1d..81e6db7 100644 --- a/event-common/src/main/java/org/xbib/event/common/EventManager.java +++ b/event-common/src/main/java/org/xbib/event/common/EventManager.java @@ -11,16 +11,18 @@ import java.util.Map; import java.util.ServiceLoader; import org.xbib.datastructures.json.tiny.Json; +import org.xbib.datastructures.json.tiny.JsonBuilder; import org.xbib.event.Event; import org.xbib.event.EventConsumer; import org.xbib.event.Listener; import org.xbib.event.Payload; import org.xbib.event.bus.AsyncEventBus; import org.xbib.event.bus.EventBus; +import org.xbib.event.bus.Subscriber; import org.xbib.event.bus.SubscriberExceptionContext; import org.xbib.event.bus.SubscriberExceptionHandler; +import org.xbib.event.bus.SubscriberRegistry; import org.xbib.event.clock.ClockEventManagerService; -import org.xbib.event.generic.GenericEventManagerService; import org.xbib.event.path.FileFollowEventManagerService; import org.xbib.event.path.PathEventManagerService; import org.xbib.event.timer.TimerEventManagerService; @@ -31,12 +33,14 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Objects; +import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.logging.Level; import java.util.logging.Logger; -public final class EventManager { +public final class EventManager extends AbstractEventManagerService implements EventManagerService { private static final Logger logger = Logger.getLogger(EventManager.class.getName()); @@ -47,6 +51,7 @@ public final class EventManager { private final Map, EventManagerService> eventManagerServices; private EventManager(EventManagerBuilder builder) { + super(); this.builder = builder; eventTypes.put("null", NullEvent.class); eventTypes.put("generic", GenericEventImpl.class); @@ -59,7 +64,7 @@ public final class EventManager { } logger.log(Level.INFO, "installed events = " + eventTypes.keySet()); this.eventManagerServices = new HashMap<>(); - eventManagerServices.put(GenericEventManagerService.class, new GenericEventManagerService().init(this)); + eventManagerServices.put(this.getClass(), this); eventManagerServices.put(ClockEventManagerService.class, new ClockEventManagerService().init(this)); eventManagerServices.put(TimerEventManagerService.class, new TimerEventManagerService().init(this)); eventManagerServices.put(FileFollowEventManagerService.class, new FileFollowEventManagerService().init(this)); @@ -70,6 +75,10 @@ public final class EventManager { logger.log(Level.INFO, "installed event service managers = " + eventManagerServices.keySet()); } + public EventManagerService init(EventManager eventManager) { + return this; + } + public static EventManagerBuilder builder() { return new EventManagerBuilder(); } @@ -94,19 +103,11 @@ public final class EventManager { return builder.executorService; } - public void dispatch(Event event) { - getGenericEventManagerService().post(event); - } - @SuppressWarnings("unchecked") public T getEventManagerService(Class cl) { return (T) eventManagerServices.get(cl); } - public GenericEventManagerService getGenericEventManagerService() { - return (GenericEventManagerService) eventManagerServices.get(GenericEventManagerService.class); - } - public ClockEventManagerService getClockEventManagerService() { return (ClockEventManagerService) eventManagerServices.get(ClockEventManagerService.class); } @@ -123,7 +124,19 @@ public final class EventManager { return (PathEventManagerService) eventManagerServices.get(PathEventManagerService.class); } - public void close() throws IOException { + public void publish(Event event) { + getEventBus().post(event); + } + + public void publish(GenericEventImpl event, + CompletableFuture future) { + SubscriberRegistry subscriberRegistry = getEventBus().getSubscribers(); + Set set = subscriberRegistry.getSubscribersForTesting(event.getClass()); + event.setListener(new WrappedListener(event.getListener(), set.size(), future)); + publish(event); + } + + public void shutdown() throws IOException { for (EventConsumer eventConsumer : builder.eventConsumers) { if (eventConsumer instanceof Closeable closeable) { closeable.close(); @@ -131,7 +144,9 @@ public final class EventManager { } for (EventManagerService service : eventManagerServices.values()) { try { - service.shutdown(); + if (service != this) { + service.shutdown(); + } } catch (IOException e) { logger.log(Level.SEVERE, e.getMessage(), e); } @@ -244,6 +259,33 @@ public final class EventManager { } } + private static class WrappedListener implements Listener { + + private final Listener listener; + + private int size; + + private final CompletableFuture future; + + public WrappedListener(Listener listener, int size, CompletableFuture future) { + this.listener = listener; + this.size = size; + this.future = future; + } + + @Override + public void listen(Event event) { + if (listener != null) { + listener.listen(event); + } else { + logger.log(Level.WARNING, "listener not set"); + } + if (--size == 0) { + future.complete(event); + } + } + } + public static Event eventOf(String eventType, String code, String message, @@ -338,8 +380,22 @@ public final class EventManager { return builder.fileSize; } + @Override public String toJson() throws IOException { - return Json.toString(builder.payload); + JsonBuilder builder = JsonBuilder.builder(); + builder.beginMap(); + builder.fieldIfNotNull("type", getType()); + builder.fieldIfNotNull("code", getCode()); + builder.fieldIfNotNull("message", getMessage()); + builder.buildKey("payload").buildMap(getPayload()); + builder.fieldIfNotNull("created", getCreated() != null ? getCreated().toString() : null); + builder.fieldIfNotNull("scheduled", getScheduledFor() != null ? getScheduledFor().toString() : null); + builder.fieldIfNotNull("path", getPath() != null ? getPath().toAbsolutePath().toString() : null); + builder.fieldIfNotNull("base", getBase()); + builder.fieldIfNotNull("suffix", getSuffix()); + builder.fieldIfNotNull("filesize", getFileSize()); + builder.endMap(); + return builder.build(); } @Override @@ -507,5 +563,4 @@ public final class EventManager { return pos >= 0 ? name.substring(pos + 1) : null; } } - } diff --git a/event-common/src/main/java/org/xbib/event/generic/GenericEventManagerService.java b/event-common/src/main/java/org/xbib/event/generic/GenericEventManagerService.java deleted file mode 100644 index 62ea75c..0000000 --- a/event-common/src/main/java/org/xbib/event/generic/GenericEventManagerService.java +++ /dev/null @@ -1,77 +0,0 @@ -package org.xbib.event.generic; - -import java.io.IOException; -import java.util.Set; -import java.util.concurrent.CompletableFuture; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.xbib.event.Event; -import org.xbib.event.Listener; -import org.xbib.event.bus.EventBus; -import org.xbib.event.bus.Subscriber; -import org.xbib.event.bus.SubscriberRegistry; -import org.xbib.event.common.EventManager; -import org.xbib.event.common.EventManagerService; -import org.xbib.event.common.GenericEventImpl; - -public class GenericEventManagerService implements EventManagerService { - - private static final Logger logger = Logger.getLogger(GenericEventManagerService.class.getName()); - - private EventBus eventBus; - - public GenericEventManagerService() { - } - - @Override - public GenericEventManagerService init(EventManager eventManager) { - this.eventBus = eventManager.getEventBus(); - return this; - } - - @Override - public void shutdown() throws IOException { - - } - - public void post(Object event) { - eventBus.post(event); - } - - public void post(GenericEventImpl event, - CompletableFuture future) { - SubscriberRegistry subscriberRegistry = eventBus.getSubscribers(); - Set set = subscriberRegistry.getSubscribersForTesting(event.getClass()); - logger.log(Level.INFO, "set = " + set); - event.setListener(new WrappedListener(event.getListener(), set.size(), future)); - post(event); - } - - static class WrappedListener implements Listener { - - private final Listener listener; - - private int size; - - private final CompletableFuture future; - - public WrappedListener(Listener listener, int size, CompletableFuture future) { - this.listener = listener; - this.size = size; - this.future = future; - } - - @Override - public void listen(Event event) { - if (listener != null) { - listener.listen(event); - } else { - logger.log(Level.WARNING, "listener not set"); - } - if (--size == 0) { - future.complete(event); - } - } - } -} diff --git a/event-common/src/main/java/org/xbib/event/path/FileFollowEventManagerService.java b/event-common/src/main/java/org/xbib/event/path/FileFollowEventManagerService.java index 6a3a057..ddb1904 100644 --- a/event-common/src/main/java/org/xbib/event/path/FileFollowEventManagerService.java +++ b/event-common/src/main/java/org/xbib/event/path/FileFollowEventManagerService.java @@ -1,6 +1,8 @@ package org.xbib.event.path; +import org.xbib.event.Event; import org.xbib.event.bus.EventBus; +import org.xbib.event.common.AbstractEventManagerService; import org.xbib.event.common.EventManager; import org.xbib.event.common.EventManagerService; import org.xbib.settings.Settings; @@ -17,22 +19,26 @@ import java.util.logging.Level; import java.util.logging.Logger; import java.util.regex.Pattern; -public class FileFollowEventManagerService implements EventManagerService { +public class FileFollowEventManagerService extends AbstractEventManagerService implements EventManagerService { private static final Logger logger = Logger.getLogger(FileFollowEventManagerService.class.getName()); + private EventManager eventManager; + private Map, FileFollowEventService> eventServiceMap; public FileFollowEventManagerService() { + super(); } @Override public FileFollowEventManagerService init(EventManager eventManager) { + this.eventManager = eventManager; Settings settings = eventManager.getSettings(); - EventBus eventBus = eventManager.getEventBus(); ExecutorService executorService = eventManager.getExecutorService(); this.eventServiceMap = new LinkedHashMap<>(); for (Map.Entry entry : settings.getGroups("event.filefollow").entrySet()) { + String name = entry.getKey(); Settings definition = entry.getValue(); if (definition.getAsBoolean("enabled", true)) { String baseStr = definition.get("base"); @@ -42,12 +48,12 @@ public class FileFollowEventManagerService implements EventManagerService { try { Path base = Paths.get(baseStr); Pattern pattern = Pattern.compile(patternStr); - FileFollowEventService fileFollowEventService = new FileFollowEventService(definition, eventBus, base, pattern); + FileFollowEventService fileFollowEventService = new FileFollowEventService(this, name, base, pattern); Future future = executorService.submit(fileFollowEventService); eventServiceMap.put(future, fileFollowEventService); - logger.log(Level.INFO, "file follow service " + entry.getKey() + " with base " + base + " and pattern " + pattern + " added"); + logger.log(Level.INFO, "file follow service " + name + " with base " + base + " and pattern " + pattern + " added"); } catch (Exception e) { - logger.log(Level.SEVERE, "unable to create file follow service " + entry.getKey() + ", reason " + e.getMessage(), e); + logger.log(Level.SEVERE, "unable to create file follow service " +name + ", reason " + e.getMessage(), e); } } } @@ -65,4 +71,8 @@ public class FileFollowEventManagerService implements EventManagerService { } } } + + public void publish(Event event) { + eventManager.publish(event); + } } diff --git a/event-common/src/main/java/org/xbib/event/path/FileFollowEventService.java b/event-common/src/main/java/org/xbib/event/path/FileFollowEventService.java index dcd88c1..2a34dc3 100644 --- a/event-common/src/main/java/org/xbib/event/path/FileFollowEventService.java +++ b/event-common/src/main/java/org/xbib/event/path/FileFollowEventService.java @@ -1,9 +1,7 @@ package org.xbib.event.path; import org.xbib.event.Event; -import org.xbib.event.bus.EventBus; import org.xbib.event.common.EventManager; -import org.xbib.settings.Settings; import java.io.Closeable; import java.io.IOException; @@ -33,7 +31,9 @@ public class FileFollowEventService implements Callable, Closeable { private static final Logger logger = Logger.getLogger(FileFollowEventService.class.getName()); - private final EventBus eventBus; + private final FileFollowEventManagerService fileFollowEventManagerService; + + private final String name; private final Path base; @@ -45,11 +45,12 @@ public class FileFollowEventService implements Callable, Closeable { private volatile boolean keepWatching; - public FileFollowEventService(Settings settings, - EventBus eventBus, + public FileFollowEventService(FileFollowEventManagerService fileFollowEventManagerService, + String name, Path base, Pattern pattern) throws IOException { - this.eventBus = eventBus; + this.fileFollowEventManagerService = fileFollowEventManagerService; + this.name = name; this.base = base; this.pattern = pattern; FileSystem fileSystem = base.getFileSystem(); @@ -89,13 +90,17 @@ public class FileFollowEventService implements Callable, Closeable { String content = readRange(channel, lastSize, currentSize); // split content by line, this allows pattern matching without preprocessing in worker for (String line : content.split("\n")) { - Event event = EventManager.eventBuilder() - .setType("filefollow") - .setCode(base.toString()) - .setPath(path) - .setMessage(line) - .build(); - eventBus.post(event); + if (fileFollowEventManagerService.getSuspended().contains(name)) { + logger.log(Level.WARNING, name + " is suspended"); + } else { + Event event = EventManager.eventBuilder() + .setType("filefollow") + .setCode(base.toString()) + .setPath(path) + .setMessage(line) + .build(); + fileFollowEventManagerService.publish(event); + } } } } diff --git a/event-common/src/main/java/org/xbib/event/path/PathEventManagerService.java b/event-common/src/main/java/org/xbib/event/path/PathEventManagerService.java index 27c1ed3..3662c00 100644 --- a/event-common/src/main/java/org/xbib/event/path/PathEventManagerService.java +++ b/event-common/src/main/java/org/xbib/event/path/PathEventManagerService.java @@ -3,7 +3,7 @@ package org.xbib.event.path; import org.xbib.datastructures.api.TimeValue; import org.xbib.datastructures.json.tiny.Json; import org.xbib.event.Event; -import org.xbib.event.bus.EventBus; +import org.xbib.event.common.AbstractEventManagerService; import org.xbib.event.common.EventManager; import org.xbib.event.common.EventManagerService; import org.xbib.settings.Settings; @@ -14,7 +14,6 @@ import java.io.Writer; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.ArrayList; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -24,28 +23,26 @@ import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Stream; -public class PathEventManagerService implements EventManagerService { +public class PathEventManagerService extends AbstractEventManagerService implements EventManagerService { private static final Logger logger = Logger.getLogger(PathEventManagerService.class.getName()); - private EventBus eventBus; + private EventManager eventManager; private ExecutorService executorService; private Map, PathEventService> eventServiceMap; - private List suspendedQueues; - public PathEventManagerService() { + super(); } @Override public PathEventManagerService init(EventManager eventManager) { + this.eventManager = eventManager; Settings settings = eventManager.getSettings(); - this.eventBus = eventManager.getEventBus(); this.executorService = eventManager.getExecutorService(); this.eventServiceMap = new LinkedHashMap<>(); - this.suspendedQueues = new ArrayList<>(); for (Map.Entry entry : settings.getGroups("event.path").entrySet()) { try { String name = entry.getKey(); @@ -54,9 +51,12 @@ public class PathEventManagerService implements EventManagerService { Path path = Paths.get(definition.get("path", "/var/tmp/" + name)); TimeValue lifetime = definition.getAsTime("lifetime", TimeValue.timeValueHours(72)); String eventType = definition.get("type", "path"); - createPathEventService(name, path, eventType, lifetime); + createQueue(name, path); + PathEventService pathEventService = new PathEventService(this, name, path, eventType, lifetime); + add(pathEventService); + pathEventService.drainIncoming(); } else { - logger.log(Level.WARNING, "path servive definition not enabled in configuration"); + logger.log(Level.WARNING, "path service definition not enabled in configuration"); } } catch (Exception e) { logger.log(Level.SEVERE, "unable to create event path service " + entry.getKey() + ", reason " + e.getMessage(), e); @@ -65,21 +65,29 @@ public class PathEventManagerService implements EventManagerService { return this; } - public void createPathEventService(String name, Path path, String eventType, TimeValue lifetime) throws IOException { - createQueue(name, path); - PathEventService pathEventService = new PathEventService(this, eventBus, name, path, eventType, lifetime); - add(pathEventService); - } - public void add(PathEventService pathEventService) { Future future = executorService.submit(pathEventService); eventServiceMap.put(future, pathEventService); logger.log(Level.INFO, "path event service " + pathEventService + " added"); } + public void publish(String eventType, Path path) { + Event event = EventManager.eventOf(eventType, null, null, path); + eventManager.publish(event); + } + + public void failEvent(String eventType, Path path) { + try { + Event event = EventManager.eventOf(eventType, null, null, path); + event.fail(); + } catch (IOException e) { + logger.log(Level.SEVERE, "unable to fail event because of " + e.getMessage(), e); + } + } + @Override public void shutdown() throws IOException { - logger.log(Level.INFO, "shut down all path event services"); + logger.log(Level.INFO, "shutting down all path event services"); eventServiceMap.forEach((k, v) -> { k.cancel(true); try { @@ -103,30 +111,18 @@ public class PathEventManagerService implements EventManagerService { }); } - public List getSuspendedQueues() { - return suspendedQueues; - } - - public void suspend(String queue) { - suspendedQueues.add(queue); - } - - public void resume(String queue) { - suspendedQueues.remove(queue); - } - - public boolean put(Path path, String key, Map map) throws IOException { - return put(path, key, ".json", Json.toString(map)); - } - public boolean putIfNotExists(Path path, String key, Map map) throws IOException { if (!exists(path, key, ".json")) { - return put(path, key, ".json", Json.toString(map)); + return put(path, key, map); } else { return false; } } + public boolean put(Path path, String key, Map map) throws IOException { + return put(path, key, ".json", Json.toString(map)); + } + public boolean put(Path path, String key, String suffix, String string) throws IOException { String keyFileName = key + suffix; if (Files.exists(path.resolve(Event.INCOMING).resolve(keyFileName)) || diff --git a/event-common/src/main/java/org/xbib/event/path/PathEventService.java b/event-common/src/main/java/org/xbib/event/path/PathEventService.java index 02397eb..96ee1f9 100644 --- a/event-common/src/main/java/org/xbib/event/path/PathEventService.java +++ b/event-common/src/main/java/org/xbib/event/path/PathEventService.java @@ -4,8 +4,6 @@ import java.nio.file.FileVisitResult; import java.nio.file.SimpleFileVisitor; import org.xbib.datastructures.api.TimeValue; import org.xbib.event.Event; -import org.xbib.event.bus.EventBus; -import org.xbib.event.common.EventManager; import java.io.Closeable; import java.io.IOException; @@ -25,14 +23,12 @@ import java.util.concurrent.Callable; import java.util.logging.Level; import java.util.logging.Logger; -public class PathEventService implements Callable, Closeable { +public class PathEventService implements Callable, Closeable { private static final Logger logger = Logger.getLogger(PathEventService.class.getName()); private final PathEventManagerService pathEventManager; - private final EventBus eventBus; - private final Path path; private final String name; @@ -43,18 +39,14 @@ public class PathEventService implements Callable, Closeable { private final WatchService watchService; - private int eventCount; - private volatile boolean keepWatching; public PathEventService(PathEventManagerService pathEventManager, - EventBus eventBus, String name, Path path, String eventType, TimeValue lifetime) throws IOException { this.pathEventManager = pathEventManager; - this.eventBus = eventBus; this.name = name; this.path = path; this.eventType = eventType; @@ -69,7 +61,7 @@ public class PathEventService implements Callable, Closeable { @SuppressWarnings("unchecked") @Override - public Integer call() { + public Boolean call() { try { logger.log(Level.INFO, "watch service running on " + path.resolve(Event.INCOMING)); while (keepWatching && watchService != null) { @@ -86,10 +78,10 @@ public class PathEventService implements Callable, Closeable { String watchEventContext = pathWatchEvent.context().toString(); Path p = path.resolve(Event.INCOMING).resolve(watchEventContext); logger.log(Level.FINE, "watch event " + pathWatchEvent + " context = " + watchEventContext + " path = " + p); - if (pathEventManager.getSuspendedQueues().contains(name)) { - failEvent(p); + if (pathEventManager.getSuspended().contains(name)) { + pathEventManager.failEvent(eventType, p); } else { - postEvent(p); + pathEventManager.publish(eventType, p); } } watchKey.reset(); @@ -102,7 +94,7 @@ public class PathEventService implements Callable, Closeable { } catch (Exception e) { logger.log(Level.SEVERE, e.getMessage(), e); } - return eventCount; + return true; } @Override @@ -120,7 +112,7 @@ public class PathEventService implements Callable, Closeable { public void drainIncoming() throws IOException { try (DirectoryStream stream = Files.newDirectoryStream(path.resolve(Event.INCOMING))) { - stream.forEach(this::postEvent); + stream.forEach(p -> pathEventManager.publish(eventType, p)); } } @@ -149,21 +141,6 @@ public class PathEventService implements Callable, Closeable { } } - private void postEvent(Path path) { - Event event = EventManager.eventOf(eventType, null, null, path); - eventBus.post(event); - eventCount++; - } - - private void failEvent(Path file) { - try { - Event event = EventManager.eventFromFile(file); - event.fail(); - } catch (IOException e) { - logger.log(Level.SEVERE, "unable to fail event because of " + e.getMessage(), e); - } - } - private static void delete(Path path) { if (path == null) { return; diff --git a/event-common/src/main/java/org/xbib/event/timer/TimerEventManagerService.java b/event-common/src/main/java/org/xbib/event/timer/TimerEventManagerService.java index 451efc6..6d72b4b 100644 --- a/event-common/src/main/java/org/xbib/event/timer/TimerEventManagerService.java +++ b/event-common/src/main/java/org/xbib/event/timer/TimerEventManagerService.java @@ -1,7 +1,8 @@ package org.xbib.event.timer; +import org.xbib.event.Event; import org.xbib.event.Payload; -import org.xbib.event.bus.EventBus; +import org.xbib.event.common.AbstractEventManagerService; import org.xbib.event.common.EventManager; import org.xbib.event.common.EventManagerService; import org.xbib.event.persistence.FilePersistenceStore; @@ -20,26 +21,29 @@ import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; -public class TimerEventManagerService implements EventManagerService { +public class TimerEventManagerService extends AbstractEventManagerService implements EventManagerService { private static final Logger logger = Logger.getLogger(TimerEventManagerService.class.getName()); + private EventManager eventManager; + private Map services; public TimerEventManagerService() { + super(); } @Override public TimerEventManagerService init(EventManager eventManager) { + this.eventManager = eventManager; Settings settings = eventManager.getSettings(); - EventBus eventBus = eventManager.getEventBus(); this.services = new LinkedHashMap<>(); for (Map.Entry entry : settings.getGroups("event.timer").entrySet()) { String name = entry.getKey(); Settings timerSettings = entry.getValue(); try { PersistenceStore persistenceStore = new FilePersistenceStore(timerSettings, name); - TimerEventService timerEventService = new TimerEventService(eventBus, name, ZoneId.systemDefault(), persistenceStore); + TimerEventService timerEventService = new TimerEventService(this, name, ZoneId.systemDefault(), persistenceStore); services.put(name, timerEventService); logger.log(Level.INFO, "timer " + name + " active: " + timerEventService); } catch (Exception e) { @@ -49,9 +53,21 @@ public class TimerEventManagerService implements EventManagerService { return this; } - public boolean publish(String name, - String timeSpec, - Payload payload) throws ParseException, IOException { + @Override + public void shutdown() throws IOException { + for (Map.Entry entry : services.entrySet()) { + logger.log(Level.INFO, "closing timer " + entry.getKey()); + entry.getValue().close(); + } + } + + public void publish(Event event) { + eventManager.publish(event); + } + + public boolean schedule(String name, + String timeSpec, + Payload payload) throws ParseException, IOException { if (services.containsKey(name)) { Span span = Chronic.parse(timeSpec); if (span != null) { @@ -69,9 +85,9 @@ public class TimerEventManagerService implements EventManagerService { return false; } - public boolean publish(String service, - Instant instant, - Payload payload) throws IOException { + public boolean schedule(String service, + Instant instant, + Payload payload) throws IOException { if (services.containsKey(service)) { services.get(service).schedule(instant, payload); return true; @@ -87,12 +103,4 @@ public class TimerEventManagerService implements EventManagerService { entry.getValue().purge(); } } - - @Override - public void shutdown() throws IOException { - for (Map.Entry entry : services.entrySet()) { - logger.log(Level.INFO, "closing timer " + entry.getKey()); - entry.getValue().close(); - } - } } diff --git a/event-common/src/main/java/org/xbib/event/timer/TimerEventService.java b/event-common/src/main/java/org/xbib/event/timer/TimerEventService.java index a979c06..7d0c40d 100644 --- a/event-common/src/main/java/org/xbib/event/timer/TimerEventService.java +++ b/event-common/src/main/java/org/xbib/event/timer/TimerEventService.java @@ -2,7 +2,6 @@ package org.xbib.event.timer; import org.xbib.event.Event; import org.xbib.event.Payload; -import org.xbib.event.bus.EventBus; import org.xbib.event.common.EventManager; import org.xbib.event.persistence.PersistenceStore; @@ -25,8 +24,10 @@ import java.util.logging.Logger; class TimerEventService implements Closeable { private static final Logger logger = Logger.getLogger(TimerEventService.class.getName()); - - private final EventBus eventBus; + + private final TimerEventManagerService timerEventManagerService; + + private final String name; private final ZoneId zoneId; @@ -34,11 +35,12 @@ class TimerEventService implements Closeable { private final Timer timer; - public TimerEventService(EventBus eventBus, + public TimerEventService(TimerEventManagerService timerEventManagerService, String name, ZoneId zoneId, PersistenceStore persistenceStore) throws IOException { - this.eventBus = eventBus; + this.timerEventManagerService = timerEventManagerService; + this.name = name; this.zoneId = zoneId; this.persistenceStore = persistenceStore; this.timer = new Timer(); @@ -97,12 +99,16 @@ class TimerEventService implements Closeable { @Override public void run() { try { + if (timerEventManagerService.getSuspended().contains(name)) { + logger.log(Level.FINE, "timer event " + name + " suspended"); + return; + } Event timerEvent = EventManager.eventBuilder() .setType("timer") + .setCode(name) .setPayload(payload) .build(); - logger.log(Level.FINE, "posting timer event " + timerEvent.getClass().getName() + " with payload = " + payload); - eventBus.post(timerEvent); + timerEventManagerService.publish(timerEvent); logger.log(Level.FINE, "persistence before remove: " + persistenceStore.get("tasks")); if (persistenceStore.remove("tasks", this.payload)) { logger.log(Level.FINE, "removal done"); diff --git a/event-common/src/test/java/org/xbib/event/clock/ClockEventManagerTest.java b/event-common/src/test/java/org/xbib/event/clock/ClockEventManagerTest.java index 5b03cf1..2e39ec5 100644 --- a/event-common/src/test/java/org/xbib/event/clock/ClockEventManagerTest.java +++ b/event-common/src/test/java/org/xbib/event/clock/ClockEventManagerTest.java @@ -20,6 +20,6 @@ public class ClockEventManagerTest { .register(clockEventConsumer) .build(); Thread.sleep(90000L); - eventManager.close(); + eventManager.shutdown(); } } diff --git a/event-common/src/test/java/org/xbib/event/generic/GenericEventManagerTest.java b/event-common/src/test/java/org/xbib/event/common/GenericEventManagerTest.java similarity index 90% rename from event-common/src/test/java/org/xbib/event/generic/GenericEventManagerTest.java rename to event-common/src/test/java/org/xbib/event/common/GenericEventManagerTest.java index 6dbcfe5..c7e7f4d 100644 --- a/event-common/src/test/java/org/xbib/event/generic/GenericEventManagerTest.java +++ b/event-common/src/test/java/org/xbib/event/common/GenericEventManagerTest.java @@ -1,11 +1,9 @@ -package org.xbib.event.generic; +package org.xbib.event.common; import org.junit.jupiter.api.Test; import org.xbib.event.Event; import org.xbib.event.EventConsumer; import org.xbib.event.bus.Subscribe; -import org.xbib.event.common.EventManager; -import org.xbib.event.common.GenericEventImpl; import org.xbib.settings.Settings; import java.util.concurrent.CompletableFuture; @@ -32,7 +30,7 @@ public class GenericEventManagerTest { .setType("generic") .setListener(e -> logger.log(Level.INFO, "received event " + e)) .build(); - eventManager.getGenericEventManagerService().post(event); + eventManager.publish(event); // we must wait for a certain time because we do not use a future Thread.sleep(500L); assertEquals(1, consumer.getCount()); @@ -52,7 +50,7 @@ public class GenericEventManagerTest { future.complete(e); }) .build(); - eventManager.getGenericEventManagerService().post(event); + eventManager.publish(event); Event e = future.get(1000L, TimeUnit.MILLISECONDS); assertNotNull(e); } @@ -76,7 +74,7 @@ public class GenericEventManagerTest { future.complete(e); }) .build(); - eventManager.getGenericEventManagerService().post((GenericEventImpl) event, future); + eventManager.publish((GenericEventImpl) event, future); Event e = future.get(5L, TimeUnit.SECONDS); if (e != null) { logger.log(Level.INFO, "the event " + e + " was received by all consumers"); diff --git a/event-common/src/test/java/org/xbib/event/path/FileFollowEventManagerTest.java b/event-common/src/test/java/org/xbib/event/path/FileFollowEventManagerTest.java index 22e7441..62a697a 100644 --- a/event-common/src/test/java/org/xbib/event/path/FileFollowEventManagerTest.java +++ b/event-common/src/test/java/org/xbib/event/path/FileFollowEventManagerTest.java @@ -37,6 +37,6 @@ public class FileFollowEventManagerTest { Thread.sleep(1000L); Files.delete(testTxt); Files.delete(path); - eventManager.close(); + eventManager.shutdown(); } } diff --git a/event-common/src/test/java/org/xbib/event/path/PathEventManagerTest.java b/event-common/src/test/java/org/xbib/event/path/PathEventManagerTest.java index d2fcc25..be9910e 100644 --- a/event-common/src/test/java/org/xbib/event/path/PathEventManagerTest.java +++ b/event-common/src/test/java/org/xbib/event/path/PathEventManagerTest.java @@ -34,7 +34,8 @@ public class PathEventManagerTest { bufferedWriter.write("Hello"); } Thread.sleep(2000L); - eventManager.close(); + eventManager.shutdown(); + // extra destroy to clean up test eventManager.getPathEventManagerService().destroy(); } @@ -57,7 +58,8 @@ public class PathEventManagerTest { bufferedWriter.write("Hello"); } Thread.sleep(2000L); - eventManager.close(); + eventManager.shutdown(); + // extra destroy to clean up test eventManager.getPathEventManagerService().destroy(); } diff --git a/event-common/src/test/java/org/xbib/event/timer/TimerEventManagerTest.java b/event-common/src/test/java/org/xbib/event/timer/TimerEventManagerTest.java index 6e12683..3670262 100644 --- a/event-common/src/test/java/org/xbib/event/timer/TimerEventManagerTest.java +++ b/event-common/src/test/java/org/xbib/event/timer/TimerEventManagerTest.java @@ -23,7 +23,7 @@ public class TimerEventManagerTest { .build(); TimerEventManagerService timerEventManager = eventManager.getTimerEventManagerService(); Payload payload = new Payload(Map.of("a", "b")); - timerEventManager.publish("testtimerevent", Instant.now().plusSeconds(5L), payload); + timerEventManager.schedule("testtimerevent", Instant.now().plusSeconds(5L), payload); Thread.sleep(10000L); timerEventManager.shutdown(); } diff --git a/event-net-http/src/main/java/org/xbib/event/net/http/HttpEventReceiverService.java b/event-net-http/src/main/java/org/xbib/event/net/http/HttpEventReceiverService.java index d18804a..217e4ed 100644 --- a/event-net-http/src/main/java/org/xbib/event/net/http/HttpEventReceiverService.java +++ b/event-net-http/src/main/java/org/xbib/event/net/http/HttpEventReceiverService.java @@ -24,14 +24,14 @@ public class HttpEventReceiverService { public HttpService createService(String prefix) { return BaseHttpService.builder() .setPrefix(prefix) - .setPath("/event/{type}") + .setPath("/event") .setMethod(HttpMethod.POST) .setHandler(ctx -> { Event event = EventManager.eventFromJson(ctx.getRequest().asJson()); if (event.isNullEvent()) { ctx.status(NOT_FOUND).done(); } else { - eventManager.dispatch(event); + eventManager.publish(event); ctx.status(OK) .header(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON) .charset(StandardCharsets.UTF_8)