Compare commits

..

No commits in common. '7c5a1f1a2b4b688c3cfc602f07e65b435d75eba7' and '120a9e915df2a9c1c31f03998efbeeb7355409d8' have entirely different histories.

@ -0,0 +1,33 @@
package org.xbib.event;
import java.util.Map;
public class DefaultEvent implements Event {
private String key;
private Map<String, Object> map;
public DefaultEvent() {
}
@Override
public void setKey(String key) {
this.key = key;
}
@Override
public String getKey() {
return key;
}
@Override
public void setMap(Map<String, Object> map) {
this.map = map;
}
@Override
public Map<String, Object> getMap() {
return map;
}
}

@ -4,9 +4,9 @@ import java.util.Map;
public interface Event { public interface Event {
void setType(String type); void setKey(String key);
String getType(); String getKey();
void setMap(Map<String, Object> map); void setMap(Map<String, Object> map);

@ -6,4 +6,5 @@ dependencies {
implementation libs.time implementation libs.time
implementation libs.datastructures.common implementation libs.datastructures.common
implementation libs.datastructures.json.tiny implementation libs.datastructures.json.tiny
implementation libs.netty.handler
} }

@ -1,5 +1,3 @@
import org.xbib.event.common.EventManagerService;
module org.xbib.event.common { module org.xbib.event.common {
exports org.xbib.event.bus; exports org.xbib.event.bus;
exports org.xbib.event.clock; exports org.xbib.event.clock;
@ -8,9 +6,9 @@ module org.xbib.event.common {
exports org.xbib.event.log; exports org.xbib.event.log;
exports org.xbib.event.path; exports org.xbib.event.path;
exports org.xbib.event.persistence; exports org.xbib.event.persistence;
exports org.xbib.event.syslog;
exports org.xbib.event.timer; exports org.xbib.event.timer;
exports org.xbib.event.wal; exports org.xbib.event.wal;
uses EventManagerService;
requires org.xbib.event.api; requires org.xbib.event.api;
requires org.xbib.settings.api; requires org.xbib.settings.api;
requires org.xbib.settings.datastructures.json; requires org.xbib.settings.datastructures.json;
@ -18,5 +16,10 @@ module org.xbib.event.common {
requires org.xbib.time; requires org.xbib.time;
requires org.xbib.datastructures.common; requires org.xbib.datastructures.common;
requires org.xbib.datastructures.json.tiny; requires org.xbib.datastructures.json.tiny;
requires io.netty.buffer;
requires io.netty.common;
requires io.netty.transport;
requires io.netty.handler;
requires io.netty.codec;
requires java.logging; requires java.logging;
} }

@ -1,7 +1,7 @@
package org.xbib.event.clock; package org.xbib.event.clock;
import org.xbib.event.bus.AsyncEventBus;
import org.xbib.event.bus.EventBus; import org.xbib.event.bus.EventBus;
import org.xbib.event.common.EventManagerService;
import org.xbib.settings.Settings; import org.xbib.settings.Settings;
import org.xbib.time.schedule.CronExpression; import org.xbib.time.schedule.CronExpression;
import org.xbib.time.schedule.CronSchedule; import org.xbib.time.schedule.CronSchedule;
@ -11,25 +11,27 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
public class ClockEventManagerService implements EventManagerService, Closeable { public class ClockEventManager implements Closeable {
private static final Logger logger = Logger.getLogger(ClockEventManagerService.class.getName()); private static final Logger logger = Logger.getLogger(ClockEventManager.class.getName());
private CronSchedule<Integer> cronSchedule; private final CronSchedule<Integer> cronSchedule;
private List<String> suspended; private final List<String> suspended;
public ClockEventManagerService() { public ClockEventManager(Settings settings) {
this(settings, new AsyncEventBus(Executors.newSingleThreadExecutor()), ClockEventManager.class.getClassLoader());
} }
public ClockEventManagerService init(Settings settings, EventBus eventBus, ClassLoader classLoader, ExecutorService executorService) { public ClockEventManager(Settings settings,
EventBus eventBus,
ClassLoader classLoader) {
this.suspended = new ArrayList<>(); this.suspended = new ArrayList<>();
ThreadFactory threadFactory = new ThreadFactory() { ThreadFactory threadFactory = new ThreadFactory() {
int n = 1; int n = 1;
@ -38,9 +40,9 @@ public class ClockEventManagerService implements EventManagerService, Closeable
return new Thread(r, "clock-event-manager-" + (n++)); return new Thread(r, "clock-event-manager-" + (n++));
} }
}; };
ScheduledExecutorService scheduledExecutorService = ScheduledExecutorService executorService =
Executors.newScheduledThreadPool(settings.getAsInt("pool.size", 2), threadFactory); Executors.newScheduledThreadPool(settings.getAsInt("pool.size", 2), threadFactory);
this.cronSchedule = new CronSchedule<>(scheduledExecutorService); this.cronSchedule = new CronSchedule<>(executorService);
for (Map.Entry<String, Settings> clockEventService : settings.getGroups("event.clock").entrySet()) { for (Map.Entry<String, Settings> clockEventService : settings.getGroups("event.clock").entrySet()) {
String name = clockEventService.getKey(); String name = clockEventService.getKey();
Settings entrySettings = clockEventService.getValue(); Settings entrySettings = clockEventService.getValue();
@ -67,7 +69,6 @@ public class ClockEventManagerService implements EventManagerService, Closeable
} }
logger.log(Level.INFO, "entries = " + cronSchedule.getEntries()); logger.log(Level.INFO, "entries = " + cronSchedule.getEntries());
cronSchedule.start(); cronSchedule.start();
return this;
} }
public List<String> getSuspended() { public List<String> getSuspended() {

@ -10,7 +10,7 @@ public class ClockEventService implements Callable<Integer> {
private static final Logger logger = Logger.getLogger(ClockEventService.class.getName()); private static final Logger logger = Logger.getLogger(ClockEventService.class.getName());
private final ClockEventManagerService manager; private final ClockEventManager manager;
private final EventBus eventBus; private final EventBus eventBus;
@ -18,7 +18,7 @@ public class ClockEventService implements Callable<Integer> {
private final Class<? extends ClockEvent> eventClass; private final Class<? extends ClockEvent> eventClass;
public ClockEventService(ClockEventManagerService manager, public ClockEventService(ClockEventManager manager,
EventBus eventBus, EventBus eventBus,
String name, String name,
Class<? extends ClockEvent> eventClass) { Class<? extends ClockEvent> eventClass) {
@ -29,7 +29,7 @@ public class ClockEventService implements Callable<Integer> {
} }
@Override @Override
public Integer call() { public Integer call() throws Exception {
try { try {
if (manager.getSuspended().contains(name)) { if (manager.getSuspended().contains(name)) {
logger.log(Level.FINE, "clock event " + name + " suspended"); logger.log(Level.FINE, "clock event " + name + " suspended");

@ -1,14 +1,14 @@
package org.xbib.event.clock; package org.xbib.event.clock;
import org.xbib.event.common.EventImpl; import org.xbib.event.DefaultEvent;
import java.time.Instant; import java.time.Instant;
public class ClockEventImpl extends EventImpl implements ClockEvent { public class DefaultClockEvent extends DefaultEvent implements ClockEvent {
private Instant instant; private Instant instant;
public ClockEventImpl() { public DefaultClockEvent() {
} }
@Override @Override

@ -1,53 +0,0 @@
package org.xbib.event.common;
import org.xbib.datastructures.json.tiny.Json;
import org.xbib.event.Event;
import java.io.IOException;
import java.util.Map;
public class EventImpl implements Event {
private String type;
private Map<String, Object> map;
public EventImpl() {
}
@Override
public void setType(String type) {
this.type = type;
}
@Override
public String getType() {
return type;
}
@Override
public void setMap(Map<String, Object> map) {
this.map = map;
}
@Override
public Map<String, Object> getMap() {
return map;
}
public static EventImpl fromJson(String json) {
Map<String, Object> map = Json.toMap(json);
EventImpl event = new EventImpl();
event.setType((String) map.get("type"));
event.setMap(map);
return event;
}
public String toJson() throws IOException {
return Json.toString(map);
}
public boolean isNullEvent() {
return type == null;
}
}

@ -1,23 +1,22 @@
package org.xbib.event.common; package org.xbib.event.common;
import java.util.HashMap;
import java.util.Map;
import java.util.ServiceLoader; import java.util.ServiceLoader;
import org.xbib.event.Event;
import org.xbib.event.EventConsumer; import org.xbib.event.EventConsumer;
import org.xbib.event.bus.AsyncEventBus; import org.xbib.event.bus.AsyncEventBus;
import org.xbib.event.bus.SubscriberExceptionContext; import org.xbib.event.bus.SubscriberExceptionContext;
import org.xbib.event.bus.SubscriberExceptionHandler; import org.xbib.event.bus.SubscriberExceptionHandler;
import org.xbib.event.clock.ClockEventManagerService; import org.xbib.event.clock.ClockEventManager;
import org.xbib.event.generic.GenericEventManagerService; import org.xbib.event.generic.GenericEventManager;
import org.xbib.event.path.FileFollowEventManagerService; import org.xbib.event.path.FileFollowEventManager;
import org.xbib.event.path.PathEventManagerService; import org.xbib.event.path.PathEventManager;
import org.xbib.event.timer.TimerEventManagerService; import org.xbib.event.syslog.SyslogEventManager;
import org.xbib.event.timer.TimerEventManager;
import org.xbib.settings.Settings; import org.xbib.settings.Settings;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.time.ZoneId;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
@ -32,57 +31,54 @@ public final class EventManager {
private final Builder builder; private final Builder builder;
private final Map<Class<? extends EventManagerService>, EventManagerService> services; private final GenericEventManager genericEventManager;
private final ClockEventManager clockEventManager;
private final TimerEventManager timerEventManager;
private final FileFollowEventManager fileFollowEventManager;
private final PathEventManager pathEventManager;
private final SyslogEventManager syslogEventManager;
private EventManager(Builder builder) { private EventManager(Builder builder) {
this.builder = builder; this.builder = builder;
this.services = new HashMap<>(); this.genericEventManager = new GenericEventManager(builder.eventBus);
services.put(GenericEventManagerService.class, new GenericEventManagerService() this.clockEventManager = new ClockEventManager(builder.settings, builder.eventBus, builder.classLoader);
.init(builder.settings, builder.eventBus, builder.classLoader, builder.executorService)); this.timerEventManager = new TimerEventManager(builder.settings, builder.eventBus, builder.classLoader, ZoneId.systemDefault());
services.put(ClockEventManagerService.class, new ClockEventManagerService() this.fileFollowEventManager = new FileFollowEventManager(builder.settings, builder.eventBus, builder.executorService, builder.classLoader);
.init(builder.settings, builder.eventBus, builder.classLoader, builder.executorService)); this.pathEventManager = new PathEventManager(builder.settings, builder.eventBus, builder.executorService, builder.classLoader);
services.put(TimerEventManagerService.class, new TimerEventManagerService() this.syslogEventManager = new SyslogEventManager(builder.settings, builder.eventBus);
.init(builder.settings, builder.eventBus, builder.classLoader, builder.executorService));
services.put(FileFollowEventManagerService.class, new FileFollowEventManagerService()
.init(builder.settings, builder.eventBus, builder.classLoader, builder.executorService));
services.put(PathEventManagerService.class, new PathEventManagerService()
.init(builder.settings, builder.eventBus, builder.classLoader, builder.executorService));
for (EventManagerService service : ServiceLoader.load(EventManagerService.class)) {
services.put(service.getClass(), service.init(builder.settings, builder.eventBus, builder.classLoader, builder.executorService));
}
logger.log(Level.INFO, "installed event service managers = " + services.keySet());
} }
public static Builder builder(Settings settings) { public static Builder builder(Settings settings) {
return new Builder(settings); return new Builder(settings);
} }
public void submit(Event event) { public GenericEventManager getGenericEventManager() {
getGenericEventManagerService().post(event); return genericEventManager;
} }
public EventManagerService getEventManagerService(Class<? extends EventManagerService> cl) { public ClockEventManager getClockEventManager() {
return services.get(cl); return clockEventManager;
} }
public GenericEventManagerService getGenericEventManagerService() { public TimerEventManager getTimerEventManager() {
return (GenericEventManagerService) services.get(GenericEventManagerService.class); return timerEventManager;
} }
public ClockEventManagerService getClockEventManagerService() { public FileFollowEventManager getFileFollowEventManager() {
return (ClockEventManagerService) services.get(ClockEventManagerService.class); return fileFollowEventManager;
} }
public TimerEventManagerService getTimerEventManagerService() { public PathEventManager getPathEventManager() {
return (TimerEventManagerService) services.get(TimerEventManagerService.class); return pathEventManager;
} }
public FileFollowEventManagerService getFileFollowEventManagerService() { public SyslogEventManager getSyslogEventManager() {
return (FileFollowEventManagerService) services.get(FileFollowEventManagerService.class); return syslogEventManager;
}
public PathEventManagerService getPathEventManagerService() {
return (PathEventManagerService) services.get(PathEventManagerService.class);
} }
public void close() throws IOException { public void close() throws IOException {
@ -91,11 +87,11 @@ public final class EventManager {
closeable.close(); closeable.close();
} }
} }
for (EventManagerService service : services.values()) { clockEventManager.close();
if (service instanceof Closeable closeable) { timerEventManager.close();
closeable.close(); fileFollowEventManager.close();
} pathEventManager.close();
} syslogEventManager.close();
} }
public static class Builder { public static class Builder {

@ -1,11 +0,0 @@
package org.xbib.event.common;
import org.xbib.event.bus.EventBus;
import org.xbib.settings.Settings;
import java.util.concurrent.ExecutorService;
public interface EventManagerService {
EventManagerService init(Settings settings, EventBus eventBus, ClassLoader classLoader, ExecutorService executorService);
}

@ -1,20 +1,20 @@
package org.xbib.event.generic; package org.xbib.event.generic;
import org.xbib.event.common.EventImpl; import org.xbib.event.DefaultEvent;
public class GenericEventImpl extends EventImpl implements GenericEvent { public class DefaultGenericEvent extends DefaultEvent implements GenericEvent {
private Listener listener; private Listener listener;
public GenericEventImpl() { public DefaultGenericEvent() {
this(null); this(null);
} }
public GenericEventImpl(Listener listener) { public DefaultGenericEvent(Listener listener) {
this.listener = listener; this.listener = listener;
} }
public GenericEventImpl setListener(Listener listener) { public DefaultGenericEvent setListener(Listener listener) {
this.listener = listener; this.listener = listener;
return this; return this;
} }

@ -2,32 +2,24 @@ package org.xbib.event.generic;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import org.xbib.event.bus.EventBus; import org.xbib.event.bus.AsyncEventBus;
import org.xbib.event.bus.Subscriber; import org.xbib.event.bus.Subscriber;
import org.xbib.event.bus.SubscriberRegistry; import org.xbib.event.bus.SubscriberRegistry;
import org.xbib.event.common.EventManagerService;
import org.xbib.settings.Settings;
public class GenericEventManagerService implements EventManagerService { public class GenericEventManager {
private EventBus eventBus; private final AsyncEventBus eventBus;
public GenericEventManagerService() { public GenericEventManager(AsyncEventBus eventBus) {
}
@Override
public GenericEventManagerService init(Settings settings, EventBus eventBus, ClassLoader classLoader, ExecutorService executorService) {
this.eventBus = eventBus; this.eventBus = eventBus;
return this;
} }
public void post(Object event) { public void post(Object event) {
eventBus.post(event); eventBus.post(event);
} }
public void post(GenericEventImpl event, public void post(DefaultGenericEvent event,
CompletableFuture<GenericEvent> future) { CompletableFuture<GenericEvent> future) {
SubscriberRegistry subscriberRegistry = eventBus.getSubscribers(); SubscriberRegistry subscriberRegistry = eventBus.getSubscribers();
Set<Subscriber> set = subscriberRegistry.getSubscribersForTesting(event.getClass()); Set<Subscriber> set = subscriberRegistry.getSubscribersForTesting(event.getClass());

@ -1,16 +1,16 @@
package org.xbib.event.path; package org.xbib.event.path;
import org.xbib.event.common.EventImpl; import org.xbib.event.DefaultEvent;
import java.nio.file.Path; import java.nio.file.Path;
public class FileFollowEventImpl extends EventImpl implements FileFollowEvent { public class DefaultFileFollowEvent extends DefaultEvent implements FileFollowEvent {
private Path path; private Path path;
private String content; private String content;
public FileFollowEventImpl() { public DefaultFileFollowEvent() {
} }
@Override @Override

@ -1,6 +1,6 @@
package org.xbib.event.path; package org.xbib.event.path;
import org.xbib.event.common.EventImpl; import org.xbib.event.DefaultEvent;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
@ -9,7 +9,7 @@ import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.FileTime; import java.nio.file.attribute.FileTime;
import java.time.Instant; import java.time.Instant;
public class PathEventImpl extends EventImpl implements PathEvent { public class DefaultPathEvent extends DefaultEvent implements PathEvent {
private Path path; private Path path;

@ -1,7 +1,6 @@
package org.xbib.event.path; package org.xbib.event.path;
import org.xbib.event.bus.EventBus; import org.xbib.event.bus.AsyncEventBus;
import org.xbib.event.common.EventManagerService;
import org.xbib.settings.Settings; import org.xbib.settings.Settings;
import java.io.Closeable; import java.io.Closeable;
@ -17,21 +16,17 @@ import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import java.util.regex.Pattern; import java.util.regex.Pattern;
public class FileFollowEventManagerService implements EventManagerService, Closeable { public class FileFollowEventManager implements Closeable {
private static final Logger logger = Logger.getLogger(FileFollowEventManagerService.class.getName()); private static final Logger logger = Logger.getLogger(FileFollowEventManager.class.getName());
private Map<Future<?>, FileFollowEventService> eventServiceMap; private final Map<Future<?>, FileFollowEventService> eventServiceMap;
public FileFollowEventManagerService() {
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override public FileFollowEventManager(Settings settings,
public FileFollowEventManagerService init(Settings settings, AsyncEventBus eventBus,
EventBus eventBus, ExecutorService executorService,
ClassLoader classLoader, ClassLoader classLoader) {
ExecutorService executorService) {
this.eventServiceMap = new LinkedHashMap<>(); this.eventServiceMap = new LinkedHashMap<>();
for (Map.Entry<String, Settings> entry : settings.getGroups("event.filefollow").entrySet()) { for (Map.Entry<String, Settings> entry : settings.getGroups("event.filefollow").entrySet()) {
Settings definition = entry.getValue(); Settings definition = entry.getValue();
@ -43,7 +38,7 @@ public class FileFollowEventManagerService implements EventManagerService, Close
try { try {
Path base = Paths.get(baseStr); Path base = Paths.get(baseStr);
Pattern pattern = Pattern.compile(patternStr); Pattern pattern = Pattern.compile(patternStr);
String className = definition.get("class", FileFollowEventImpl.class.getName()); String className = definition.get("class", DefaultFileFollowEvent.class.getName());
Class<? extends FileFollowEvent> eventClass = (Class<? extends FileFollowEvent>) classLoader.loadClass(className); Class<? extends FileFollowEvent> eventClass = (Class<? extends FileFollowEvent>) classLoader.loadClass(className);
FileFollowEventService fileFollowEventService = new FileFollowEventService(definition, eventBus, base, pattern, eventClass); FileFollowEventService fileFollowEventService = new FileFollowEventService(definition, eventBus, base, pattern, eventClass);
Future<?> future = executorService.submit(fileFollowEventService); Future<?> future = executorService.submit(fileFollowEventService);
@ -54,7 +49,6 @@ public class FileFollowEventManagerService implements EventManagerService, Close
} }
} }
} }
return this;
} }
@Override @Override

@ -92,7 +92,7 @@ public class FileFollowEventService implements Callable<Integer>, Closeable {
// split content by line, this allows pattern matching without preprocessing in worker // split content by line, this allows pattern matching without preprocessing in worker
for (String line : content.split("\n")) { for (String line : content.split("\n")) {
FileFollowEvent event = eventClass.getDeclaredConstructor().newInstance(); FileFollowEvent event = eventClass.getDeclaredConstructor().newInstance();
event.setType(base.toString()); event.setKey(base.toString());
event.setPath(path); event.setPath(path);
event.setContent(line); event.setContent(line);
eventBus.post(event); eventBus.post(event);

@ -3,7 +3,6 @@ package org.xbib.event.path;
import org.xbib.datastructures.api.TimeValue; import org.xbib.datastructures.api.TimeValue;
import org.xbib.datastructures.json.tiny.Json; import org.xbib.datastructures.json.tiny.Json;
import org.xbib.event.bus.EventBus; import org.xbib.event.bus.EventBus;
import org.xbib.event.common.EventManagerService;
import org.xbib.settings.Settings; import org.xbib.settings.Settings;
import java.io.Closeable; import java.io.Closeable;
@ -23,29 +22,25 @@ import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import java.util.stream.Stream; import java.util.stream.Stream;
public class PathEventManagerService implements EventManagerService, Closeable { public class PathEventManager implements Closeable {
private static final Logger logger = Logger.getLogger(PathEventManagerService.class.getName()); private static final Logger logger = Logger.getLogger(PathEventManager.class.getName());
private EventBus eventBus; private final EventBus eventBus;
private ExecutorService executorService; private final ExecutorService executorService;
private Path path; private final Path path;
private Map<Future<?>, PathEventService> eventServiceMap; private final Map<Future<?>, PathEventService> eventServiceMap;
private List<String> suspendedQueues; private final List<String> suspendedQueues;
public PathEventManagerService() {
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override public PathEventManager(Settings settings,
public PathEventManagerService init(Settings settings, EventBus eventBus,
EventBus eventBus, ExecutorService executorService,
ClassLoader classLoader, ClassLoader classLoader) {
ExecutorService executorService) {
this.eventBus = eventBus; this.eventBus = eventBus;
this.executorService = executorService; this.executorService = executorService;
this.eventServiceMap = new LinkedHashMap<>(); this.eventServiceMap = new LinkedHashMap<>();
@ -58,7 +53,7 @@ public class PathEventManagerService implements EventManagerService, Closeable {
if (definition.getAsBoolean("enabled", true)) { if (definition.getAsBoolean("enabled", true)) {
int maxBytes = definition.getAsInt("maxfilesize", 10 * 1024 * 1024); // 10 MB int maxBytes = definition.getAsInt("maxfilesize", 10 * 1024 * 1024); // 10 MB
TimeValue lifetime = definition.getAsTime("lifetime", TimeValue.timeValueHours(72)); TimeValue lifetime = definition.getAsTime("lifetime", TimeValue.timeValueHours(72));
String className = definition.get("class", PathEventImpl.class.getName()); String className = definition.get("class", DefaultPathEvent.class.getName());
Class<? extends PathEvent> eventClass = (Class<? extends PathEvent>) classLoader.loadClass(className); Class<? extends PathEvent> eventClass = (Class<? extends PathEvent>) classLoader.loadClass(className);
Path p = path.resolve(name); Path p = path.resolve(name);
createPathEventService(name, p, maxBytes, lifetime, eventClass); createPathEventService(name, p, maxBytes, lifetime, eventClass);
@ -69,7 +64,6 @@ public class PathEventManagerService implements EventManagerService, Closeable {
logger.log(Level.SEVERE, "unable to create event path service " + entry.getKey() + ", reason " + e.getMessage(), e); logger.log(Level.SEVERE, "unable to create event path service " + entry.getKey() + ", reason " + e.getMessage(), e);
} }
} }
return this;
} }
public void createPathEventService(String name, public void createPathEventService(String name,

@ -32,7 +32,7 @@ public class PathEventService implements Callable<Integer>, Closeable {
public static final String FAIL = "fail"; public static final String FAIL = "fail";
private final PathEventManagerService pathEventManager; private final PathEventManager pathEventManager;
private final EventBus eventBus; private final EventBus eventBus;
@ -52,7 +52,7 @@ public class PathEventService implements Callable<Integer>, Closeable {
private volatile boolean keepWatching; private volatile boolean keepWatching;
public PathEventService(PathEventManagerService pathEventManager, public PathEventService(PathEventManager pathEventManager,
EventBus eventBus, EventBus eventBus,
String name, String name,
Path path, Path path,
@ -215,7 +215,7 @@ public class PathEventService implements Callable<Integer>, Closeable {
private PathEvent toEvent(String base, Path file, String suffix, String json) { private PathEvent toEvent(String base, Path file, String suffix, String json) {
try { try {
PathEvent event = pathEventClass.getConstructor().newInstance(); PathEvent event = pathEventClass.getConstructor().newInstance();
event.setType(base); event.setKey(base);
event.setFile(file); event.setFile(file);
event.setSuffix(suffix); event.setSuffix(suffix);
event.setPath(path); // remember directory for fail() and success() event.setPath(path); // remember directory for fail() and success()

@ -138,12 +138,12 @@ public class DefaultSyslogMessage implements SyslogMessage {
} }
@Override @Override
public void setType(String key) { public void setKey(String key) {
// ignore // ignore
} }
@Override @Override
public String getType() { public String getKey() {
return builder.messageId; return builder.messageId;
} }

@ -72,6 +72,7 @@ public class MessageEncoder extends MessageToMessageEncoder<Message> {
buffer.writeCharSequence(kvp.getValue().toString(), charset); buffer.writeCharSequence(kvp.getValue().toString(), charset);
index++; index++;
} }
output.add(buffer); output.add(buffer);
} }

@ -1,7 +1,6 @@
package org.xbib.event.syslog; package org.xbib.event.syslog;
import org.xbib.event.bus.EventBus; import org.xbib.event.bus.AsyncEventBus;
import org.xbib.event.common.EventManagerService;
import org.xbib.settings.Settings; import org.xbib.settings.Settings;
import java.io.Closeable; import java.io.Closeable;
@ -9,22 +8,16 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
public class SyslogEventManagerService implements EventManagerService, Closeable { public class SyslogEventManager implements Closeable {
private static final Logger logger = Logger.getLogger(SyslogEventManagerService.class.getName()); private static final Logger logger = Logger.getLogger(SyslogEventManager.class.getName());
private List<SyslogService> syslogServices; private final List<SyslogService> syslogServices;
public SyslogEventManagerService() {} public SyslogEventManager(Settings settings,
AsyncEventBus eventBus) {
@Override
public SyslogEventManagerService init(Settings settings,
EventBus eventBus,
ClassLoader classLoader,
ExecutorService executorService) {
this.syslogServices = new ArrayList<>(); this.syslogServices = new ArrayList<>();
for (Map.Entry<String, Settings> entry : settings.getGroups("event.syslog").entrySet()) { for (Map.Entry<String, Settings> entry : settings.getGroups("event.syslog").entrySet()) {
Settings definition = entry.getValue(); Settings definition = entry.getValue();
@ -38,7 +31,6 @@ public class SyslogEventManagerService implements EventManagerService, Closeable
} }
} }
} }
return this;
} }
@Override @Override

@ -1,14 +1,14 @@
package org.xbib.event.timer; package org.xbib.event.timer;
import org.xbib.event.common.EventImpl; import org.xbib.event.DefaultEvent;
import java.time.Instant; import java.time.Instant;
public class TimerEventImpl extends EventImpl implements TimerEvent { public class DefaultTimerEvent extends DefaultEvent implements TimerEvent {
private Instant instant; private Instant instant;
public TimerEventImpl() { public DefaultTimerEvent() {
} }
@Override @Override

@ -2,7 +2,6 @@ package org.xbib.event.timer;
import org.xbib.event.bus.AsyncEventBus; import org.xbib.event.bus.AsyncEventBus;
import org.xbib.event.bus.EventBus; import org.xbib.event.bus.EventBus;
import org.xbib.event.common.EventManagerService;
import org.xbib.event.persistence.FilePersistenceStore; import org.xbib.event.persistence.FilePersistenceStore;
import org.xbib.event.persistence.PersistenceStore; import org.xbib.event.persistence.PersistenceStore;
import org.xbib.settings.Settings; import org.xbib.settings.Settings;
@ -17,27 +16,26 @@ import java.time.ZoneId;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
public class TimerEventManagerService implements EventManagerService, Closeable { public class TimerEventManager implements Closeable {
private static final Logger logger = Logger.getLogger(TimerEventManagerService.class.getName()); private static final Logger logger = Logger.getLogger(TimerEventManager.class.getName());
private Map<String, TimerEventService> services; private final Map<String, TimerEventService> services;
public TimerEventManagerService() { public TimerEventManager(Settings settings) {
this(settings, new AsyncEventBus(Executors.newSingleThreadExecutor()),
TimerEventManager.class.getClassLoader(), ZoneId.systemDefault());
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override public TimerEventManager(Settings settings,
public TimerEventManagerService init(Settings settings, EventBus eventBus,
EventBus eventBus, ClassLoader classLoader,
ClassLoader classLoader, ZoneId zoneId) {
ExecutorService executorService) {
this.services = new LinkedHashMap<>(); this.services = new LinkedHashMap<>();
for (Map.Entry<String, Settings> entry : settings.getGroups("event.timer").entrySet()) { for (Map.Entry<String, Settings> entry : settings.getGroups("event.timer").entrySet()) {
String name = entry.getKey(); String name = entry.getKey();
@ -46,43 +44,38 @@ public class TimerEventManagerService implements EventManagerService, Closeable
try { try {
Class<? extends TimerEvent> eventClass = (Class<? extends TimerEvent>) classLoader.loadClass(className); Class<? extends TimerEvent> eventClass = (Class<? extends TimerEvent>) classLoader.loadClass(className);
PersistenceStore<String, Object> persistenceStore = new FilePersistenceStore(timerSettings, name); PersistenceStore<String, Object> persistenceStore = new FilePersistenceStore(timerSettings, name);
services.put(name, new TimerEventService(eventBus, name, eventClass, ZoneId.systemDefault(), persistenceStore)); services.put(name, new TimerEventService(eventBus, name, eventClass, zoneId, persistenceStore));
logger.log(Level.INFO, "timer " + name + " active for timer event class " + className); logger.log(Level.INFO, "timer " + name + " active for timer event class " + className);
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.WARNING, "unable to activate timer " + name + ", reason: " + e.getMessage(), e); logger.log(Level.WARNING, "unable to activate timer " + name + ", reason: " + e.getMessage(), e);
} }
} }
return this;
} }
public boolean put(String service, public boolean put(String key, String timeSpec, Map<String,Object> map) throws ParseException, IOException {
String timeSpec, if (services.containsKey(key)) {
Map<String, Object> map) throws ParseException, IOException {
if (services.containsKey(service)) {
Span span = Chronic.parse(timeSpec); Span span = Chronic.parse(timeSpec);
if (span != null) { if (span != null) {
ZonedDateTime zonedDateTime = span.getBeginCalendar(); ZonedDateTime zonedDateTime = span.getBeginCalendar();
services.get(service).schedule(zonedDateTime.toInstant(), map); services.get(key).schedule(zonedDateTime.toInstant(), map);
logger.log(Level.INFO, "scheduled to " + zonedDateTime); logger.log(Level.INFO, "scheduled to " + zonedDateTime);
} else { } else {
logger.log(Level.INFO, "timer event key " + service + ": can not understand time spec " + timeSpec); logger.log(Level.INFO, "timer event key " + key + ": can not understand time spec " + timeSpec);
return false; return false;
} }
return true; return true;
} else { } else {
logger.log(Level.SEVERE, "unknown timer event key: " + service); logger.log(Level.SEVERE, "unknown timer event key: " + key);
} }
return false; return false;
} }
public boolean put(String service, public boolean put(String key, Instant instant, Map<String,Object> map) throws IOException {
Instant instant, if (services.containsKey(key)) {
Map<String, Object> map) throws IOException { services.get(key).schedule(instant, map);
if (services.containsKey(service)) {
services.get(service).schedule(instant, map);
return true; return true;
} else { } else {
logger.log(Level.SEVERE, "unknown timer event key: " + service); logger.log(Level.SEVERE, "unknown timer event key: " + key);
} }
return false; return false;
} }

@ -3,7 +3,7 @@ package org.xbib.event;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.xbib.event.bus.Subscribe; import org.xbib.event.bus.Subscribe;
import org.xbib.event.common.EventManager; import org.xbib.event.common.EventManager;
import org.xbib.event.generic.GenericEventImpl; import org.xbib.event.generic.DefaultGenericEvent;
import org.xbib.event.generic.GenericEvent; import org.xbib.event.generic.GenericEvent;
import org.xbib.settings.Settings; import org.xbib.settings.Settings;
@ -24,7 +24,7 @@ public class EventManagerTest {
EventManager eventManager = EventManager.builder(settings) EventManager eventManager = EventManager.builder(settings)
.register(consumer) .register(consumer)
.build(); .build();
eventManager.getGenericEventManagerService().post(new GenericEventImpl(e -> { eventManager.getGenericEventManager().post(new DefaultGenericEvent(e -> {
logger.log(Level.INFO, "received event " + e); logger.log(Level.INFO, "received event " + e);
})); }));
} }
@ -39,7 +39,7 @@ public class EventManagerTest {
.register(consumer) .register(consumer)
.build(); .build();
CompletableFuture<GenericEvent> future = new CompletableFuture<>(); CompletableFuture<GenericEvent> future = new CompletableFuture<>();
eventManager.getGenericEventManagerService().post(new GenericEventImpl(e -> { eventManager.getGenericEventManager().post(new DefaultGenericEvent(e -> {
logger.log(Level.INFO, "received event " + e); logger.log(Level.INFO, "received event " + e);
future.complete(e); future.complete(e);
})); }));
@ -59,7 +59,7 @@ public class EventManagerTest {
.loadEventConsumers() .loadEventConsumers()
.build(); .build();
CompletableFuture<GenericEvent> future = new CompletableFuture<>(); CompletableFuture<GenericEvent> future = new CompletableFuture<>();
eventManager.getGenericEventManagerService().post(new GenericEventImpl(e -> { eventManager.getGenericEventManager().post(new DefaultGenericEvent(e -> {
logger.log(Level.INFO, "received event " + e); logger.log(Level.INFO, "received event " + e);
}), future); }), future);
GenericEvent e = future.get(); GenericEvent e = future.get();
@ -72,7 +72,7 @@ public class EventManagerTest {
} }
@Subscribe @Subscribe
public void onEvent(GenericEventImpl event) { public void onEvent(DefaultGenericEvent event) {
event.received(); event.received();
} }
} }

@ -19,7 +19,8 @@ public class ClockEventManagerTest {
EventManager eventManager = EventManager.builder(settings) EventManager eventManager = EventManager.builder(settings)
.register(clockEventConsumer) .register(clockEventConsumer)
.build(); .build();
ClockEventManager clockEventManager = eventManager.getClockEventManager();
Thread.sleep(90000L); Thread.sleep(90000L);
eventManager.close(); clockEventManager.close();
} }
} }

@ -1,4 +1,4 @@
package org.xbib.event.clock; package org.xbib.event.clock;
public class TestClockEvent extends ClockEventImpl { public class TestClockEvent extends DefaultClockEvent {
} }

@ -28,14 +28,15 @@ public class FileFollowEventManagerTest {
EventManager eventManager = EventManager.builder(settings) EventManager eventManager = EventManager.builder(settings)
.register(consumer) .register(consumer)
.build(); .build();
FileFollowEventManager fileFolloeEventManager = eventManager.getFileFollowEventManager();
Thread.sleep(5000L); Thread.sleep(5000L);
try (BufferedWriter bufferedWriter = Files.newBufferedWriter(path.resolve("test.txt"))) { try (BufferedWriter bufferedWriter = Files.newBufferedWriter(path.resolve("test.txt"))) {
bufferedWriter.write("Hello"); bufferedWriter.write("Hello");
logger.log(Level.INFO, "Hello written"); logger.log(Level.INFO, "Hello written");
} }
Thread.sleep(5000L); Thread.sleep(5000L);
fileFolloeEventManager.close();
Files.delete(path.resolve("test.txt")); Files.delete(path.resolve("test.txt"));
Files.delete(path); Files.delete(path);
eventManager.close();
} }
} }

@ -1,4 +1,4 @@
package org.xbib.event.path; package org.xbib.event.path;
public class TestFileFollowEvent extends FileFollowEventImpl { public class TestFileFollowEvent extends DefaultFileFollowEvent {
} }

@ -1,7 +1,6 @@
package org.xbib.event.syslog.test; package org.xbib.event.syslog;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.xbib.event.syslog.SyslogService;
import org.xbib.settings.Settings; import org.xbib.settings.Settings;
import java.io.IOException; import java.io.IOException;
@ -15,7 +14,7 @@ public class SyslogServiceTest {
.build(); .build();
SyslogService syslogService = new SyslogService(settings, null); SyslogService syslogService = new SyslogService(settings, null);
syslogService.startTcp(); syslogService.startTcp();
Thread.sleep(60000L); // send a syslog from console Thread.sleep(60000L);
syslogService.close(); syslogService.close();
} }
} }

@ -1,4 +1,4 @@
package org.xbib.event.timer; package org.xbib.event.timer;
public class TestTimerEvent extends TimerEventImpl { public class TestTimerEvent extends DefaultTimerEvent {
} }

@ -20,7 +20,7 @@ public class TimerEventManagerTest {
EventManager eventManager = EventManager.builder(settings) EventManager eventManager = EventManager.builder(settings)
.register(consumer) .register(consumer)
.build(); .build();
TimerEventManagerService timerEventManager = eventManager.getTimerEventManagerService(); TimerEventManager timerEventManager = eventManager.getTimerEventManager();
timerEventManager.put("testtimerevent", Instant.now().plusSeconds(5L), Map.of("a", "b")); timerEventManager.put("testtimerevent", Instant.now().plusSeconds(5L), Map.of("a", "b"));
Thread.sleep(10000L); Thread.sleep(10000L);
timerEventManager.close(); timerEventManager.close();

@ -1,6 +0,0 @@
dependencies {
api project(':event-common')
api libs.net.http.server.netty
api libs.net.http.client.netty
implementation libs.datastructures.json.tiny
}

@ -1,13 +0,0 @@
module org.xbib.event.net.http {
exports org.xbib.event.net.http;
requires org.xbib.event.api;
requires org.xbib.event.common;
requires org.xbib.net.http;
requires org.xbib.net.http.client;
requires org.xbib.net.http.client.netty;
requires org.xbib.net.http.client.netty.secure;
requires org.xbib.net.http.server;
requires org.xbib.net.http.server.netty;
requires org.xbib.net.http.server.netty.secure;
requires java.logging;
}

@ -1,44 +0,0 @@
package org.xbib.event.net.http;
import org.xbib.event.common.EventManager;
import org.xbib.event.common.EventImpl;
import org.xbib.net.http.HttpHeaderNames;
import org.xbib.net.http.HttpHeaderValues;
import org.xbib.net.http.HttpMethod;
import org.xbib.net.http.server.service.BaseHttpService;
import org.xbib.net.http.server.service.HttpService;
import java.nio.charset.StandardCharsets;
import static org.xbib.net.http.HttpResponseStatus.NOT_FOUND;
import static org.xbib.net.http.HttpResponseStatus.OK;
public class EventReceiverService {
private final EventManager eventManager;
public EventReceiverService(EventManager eventManager) {
this.eventManager = eventManager;
}
public HttpService createService(String prefix) {
return BaseHttpService.builder()
.setPrefix(prefix)
.setPath("/event/{type}")
.setMethod(HttpMethod.POST)
.setHandler(ctx -> {
EventImpl event = EventImpl.fromJson(ctx.getRequest().asJson());
if (event.isNullEvent()) {
ctx.status(NOT_FOUND).done();
} else {
eventManager.submit(event);
ctx.status(OK)
.header(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON)
.charset(StandardCharsets.UTF_8)
.body(event.toJson())
.done();
}
})
.build();
}
}

@ -1,5 +0,0 @@
dependencies {
api project(':event-common')
implementation libs.datastructures.json.tiny
implementation libs.netty.handler
}

@ -1,17 +0,0 @@
import org.xbib.event.common.EventManagerService;
import org.xbib.event.syslog.SyslogEventManagerService;
module org.xbib.event.syslog {
exports org.xbib.event.syslog;
provides EventManagerService with SyslogEventManagerService;
requires org.xbib.event.common;
requires org.xbib.datastructures.json.tiny;
requires io.netty.buffer;
requires io.netty.common;
requires io.netty.transport;
requires io.netty.handler;
requires io.netty.codec;
requires java.logging;
requires org.xbib.settings.api;
requires org.xbib.event.api;
}

@ -1,3 +1,3 @@
group = org.xbib group = org.xbib
name = event name = event
version = 0.2.0 version = 0.1.0

@ -18,7 +18,6 @@ dependencyResolutionManagement {
version('gradle', '8.5') version('gradle', '8.5')
version('datastructures', '5.0.6') version('datastructures', '5.0.6')
version('net', '4.0.4') version('net', '4.0.4')
version('net-http', '4.1.0')
version('netty', '4.1.104.Final') version('netty', '4.1.104.Final')
library('netty-handler', 'io.netty', 'netty-handler').versionRef('netty') library('netty-handler', 'io.netty', 'netty-handler').versionRef('netty')
library('net', 'org.xbib', 'net').versionRef('net') library('net', 'org.xbib', 'net').versionRef('net')
@ -27,8 +26,6 @@ dependencyResolutionManagement {
library('settings-api', 'org.xbib', 'settings-api').versionRef('datastructures') library('settings-api', 'org.xbib', 'settings-api').versionRef('datastructures')
library('settings-datastructures-json', 'org.xbib', 'settings-datastructures-json').versionRef('datastructures') library('settings-datastructures-json', 'org.xbib', 'settings-datastructures-json').versionRef('datastructures')
library('time', 'org.xbib', 'time').version('4.0.0') library('time', 'org.xbib', 'time').version('4.0.0')
library('net-http-server-netty', 'org.xbib', 'net-http-server-netty-secure').versionRef('net-http')
library('net-http-client-netty', 'org.xbib', 'net-http-client-netty-secure').versionRef('net-http')
library('reactivestreams', 'org.reactivestreams', 'reactive-streams').version('1.0.3') library('reactivestreams', 'org.reactivestreams', 'reactive-streams').version('1.0.3')
library('rxjava3', 'io.reactivex.rxjava3', 'rxjava').version('3.0.3') library('rxjava3', 'io.reactivex.rxjava3', 'rxjava').version('3.0.3')
} }
@ -48,4 +45,3 @@ include 'event-async'
include 'event-common' include 'event-common'
include 'event-loop' include 'event-loop'
include 'event-net-http' include 'event-net-http'
include 'event-syslog'

Loading…
Cancel
Save