Compare commits

...

10 commits

50 changed files with 1631 additions and 468 deletions

View file

@ -13,7 +13,7 @@ ext {
user = 'joerg' user = 'joerg'
name = 'event' name = 'event'
description = 'Event framework for Java (NIO paths, files, timers, journals)' description = 'Event framework for Java (NIO paths, files, timers, journals)'
inceptionYear = '2021' inceptionYear = '2024'
url = 'https://xbib.org/' + user + '/' + name url = 'https://xbib.org/' + user + '/' + name
scmUrl = 'https://xbib.org/' + user + '/' + name scmUrl = 'https://xbib.org/' + user + '/' + name
scmConnection = 'scm:git:git://xbib.org/' + user + '/' + name + '.git' scmConnection = 'scm:git:git://xbib.org/' + user + '/' + name + '.git'
@ -28,20 +28,7 @@ subprojects {
apply from: rootProject.file('gradle/compile/java.gradle') apply from: rootProject.file('gradle/compile/java.gradle')
apply from: rootProject.file('gradle/test/junit5.gradle') apply from: rootProject.file('gradle/test/junit5.gradle')
apply from: rootProject.file('gradle/repositories/maven.gradle') apply from: rootProject.file('gradle/repositories/maven.gradle')
apply from: rootProject.file('gradle/publish/maven.gradle')
} }
apply from: rootProject.file('gradle/publish/sonatype.gradle') apply from: rootProject.file('gradle/publish/sonatype.gradle')
apply from: rootProject.file('gradle/publish/forgejo.gradle') apply from: rootProject.file('gradle/publish/forgejo.gradle')
/*
dependencies {
api libs.settings.api
implementation libs.net
implementation libs.time
implementation libs.datastructures.common
implementation libs.datastructures.json.tiny
implementation libs.netty.handler
implementation libs.reactivestreams
testImplementation libs.rxjava3
testImplementation libs.settings.datastructures.json
}
*/

View file

@ -16,9 +16,7 @@ public interface Event {
String getMessage(); String getMessage();
Instant getCreated(); Instant getInstant();
Instant getScheduledFor();
Payload getPayload(); Payload getPayload();

View file

@ -0,0 +1,8 @@
package org.xbib.event;
public interface GenericEvent extends Event {
GenericEvent setListener(Listener listener);
void received();
}

View file

@ -1,16 +1,6 @@
package org.xbib.event; package org.xbib.event;
import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
@SuppressWarnings("serial") public interface Payload extends Map<String, Object> {
public class Payload extends LinkedHashMap<String, Object> {
public Payload() {
super();
}
public Payload(Map<String, Object> map) {
super(map);
}
} }

View file

@ -1,7 +1,6 @@
dependencies { dependencies {
api project(':event-api') api project(':event-api')
api libs.settings.api api libs.settings.api
implementation libs.net
implementation libs.time implementation libs.time
implementation libs.settings.datastructures.json implementation libs.settings.datastructures.json
implementation libs.datastructures.common implementation libs.datastructures.common

View file

@ -1,24 +1,26 @@
import org.xbib.event.common.EventManagerService; import org.xbib.event.common.EventManagerService;
import org.xbib.event.EventConsumer;
import org.xbib.event.Event;
module org.xbib.event.common { module org.xbib.event.common {
requires java.logging;
requires org.xbib.event.api;
requires org.xbib.settings.api;
requires org.xbib.settings.datastructures.json;
requires org.xbib.time;
requires org.xbib.datastructures.common;
requires org.xbib.datastructures.tiny;
requires org.xbib.datastructures.json.tiny;
exports org.xbib.event.bus; exports org.xbib.event.bus;
exports org.xbib.event.clock; exports org.xbib.event.clock;
exports org.xbib.event.common; exports org.xbib.event.common;
exports org.xbib.event.generic;
exports org.xbib.event.log; exports org.xbib.event.log;
exports org.xbib.event.path; exports org.xbib.event.path;
exports org.xbib.event.persistence; exports org.xbib.event.persistence;
exports org.xbib.event.timer; exports org.xbib.event.timer;
exports org.xbib.event.util;
exports org.xbib.event.wal; exports org.xbib.event.wal;
uses Event;
uses EventConsumer;
uses EventManagerService; uses EventManagerService;
uses org.xbib.event.EventConsumer;
uses org.xbib.event.Event;
requires org.xbib.event.api;
requires org.xbib.settings.api;
requires org.xbib.settings.datastructures.json;
requires org.xbib.net;
requires org.xbib.time;
requires org.xbib.datastructures.common;
requires org.xbib.datastructures.json.tiny;
requires java.logging;
} }

View file

@ -1,6 +1,8 @@
package org.xbib.event.clock; package org.xbib.event.clock;
import org.xbib.event.bus.EventBus; import java.util.List;
import org.xbib.event.Event;
import org.xbib.event.common.AbstractEventManagerService;
import org.xbib.event.common.EventManager; import org.xbib.event.common.EventManager;
import org.xbib.event.common.EventManagerService; import org.xbib.event.common.EventManagerService;
import org.xbib.settings.Settings; import org.xbib.settings.Settings;
@ -8,30 +10,29 @@ import org.xbib.time.schedule.CronExpression;
import org.xbib.time.schedule.CronSchedule; import org.xbib.time.schedule.CronSchedule;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import org.xbib.time.schedule.Entry;
public class ClockEventManagerService implements EventManagerService { public class ClockEventManagerService extends AbstractEventManagerService implements EventManagerService {
private static final Logger logger = Logger.getLogger(ClockEventManagerService.class.getName()); private static final Logger logger = Logger.getLogger(ClockEventManagerService.class.getName());
private CronSchedule<Integer> cronSchedule; private EventManager eventManager;
private List<String> suspended; private CronSchedule<Boolean> cronSchedule;
public ClockEventManagerService() { public ClockEventManagerService() {
super();
} }
@Override
public ClockEventManagerService init(EventManager eventManager) { public ClockEventManagerService init(EventManager eventManager) {
Settings settings = eventManager.getSettings(); this.eventManager = eventManager;
EventBus eventBus = eventManager.getEventBus();
this.suspended = new ArrayList<>();
ThreadFactory threadFactory = new ThreadFactory() { ThreadFactory threadFactory = new ThreadFactory() {
int n = 1; int n = 1;
@Override @Override
@ -40,45 +41,44 @@ public class ClockEventManagerService implements EventManagerService {
} }
}; };
ScheduledExecutorService scheduledExecutorService = ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(settings.getAsInt("pool.size", 2), threadFactory); Executors.newScheduledThreadPool(eventManager.getSettings().getAsInt("pool.size", 2), threadFactory);
this.cronSchedule = new CronSchedule<>(scheduledExecutorService); this.cronSchedule = new CronSchedule<>(scheduledExecutorService);
for (Map.Entry<String, Settings> mapEntry : settings.getGroups("event.clock").entrySet()) { for (Map.Entry<String, Settings> mapEntry : eventManager.getSettings().getGroups("event.clock").entrySet()) {
String name = mapEntry.getKey(); String name = mapEntry.getKey();
Settings entrySettings = mapEntry.getValue(); Settings entrySettings = mapEntry.getValue();
if (entrySettings.getAsBoolean("enabled", true)) { if (entrySettings.getAsBoolean("enabled", true)) {
String entry = entrySettings.get("entry"); String entry = entrySettings.get("entry");
if (entry != null) { String type = entrySettings.get("type", "clock");
if (entry != null && type != null) {
try { try {
ClockEventService clockEventService = new ClockEventService(this, eventBus, name); ClockEventService clockEventService = new ClockEventService(this, name, type);
cronSchedule.add(name, CronExpression.parse(entry), clockEventService); cronSchedule.add(name, CronExpression.parse(entry), clockEventService);
logger.log(Level.INFO, "cron job " + name + " scheduled on " + entry); logger.log(Level.INFO, "cron job " + name + "with type " + type + " scheduled on " + entry);
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.WARNING, "unable to schedule cron job " + mapEntry.getKey() + ", reason " + e.getMessage()); logger.log(Level.WARNING, "unable to schedule cron job " + mapEntry.getKey() + ", reason " + e.getMessage());
} }
} else {
logger.log(Level.WARNING, "clock event service in configuration is incompletely defined, name = " + name );
} }
} else { } else {
logger.log(Level.WARNING, "clock event service " + name + " in configuration not enabled"); logger.log(Level.WARNING, "clock event service in configuration not enabled, name = " + name );
} }
} }
logger.log(Level.INFO, "entries = " + cronSchedule.getEntries());
cronSchedule.start(); cronSchedule.start();
logger.log(Level.INFO, "after init: entries = " + cronSchedule.getEntries());
return this; return this;
} }
public List<String> getSuspended() {
return suspended;
}
public void suspend(String name) {
suspended.add(name);
}
public void resume(String name) {
suspended.remove(name);
}
@Override @Override
public void shutdown() throws IOException { public void shutdown() throws IOException {
cronSchedule.close(); cronSchedule.close();
} }
public void publish(Event event) {
eventManager.publish(event);
}
public List<Entry<Boolean>> getCronScheduleEntries() {
return cronSchedule.getEntries();
}
} }

View file

@ -1,47 +1,48 @@
package org.xbib.event.clock; package org.xbib.event.clock;
import java.util.Objects;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import org.xbib.event.Event; import org.xbib.event.Event;
import org.xbib.event.bus.EventBus;
import org.xbib.event.common.EventManager; import org.xbib.event.common.EventManager;
public class ClockEventService implements Callable<Integer> { public class ClockEventService implements Callable<Boolean> {
private static final Logger logger = Logger.getLogger(ClockEventService.class.getName()); private static final Logger logger = Logger.getLogger(ClockEventService.class.getName());
private final ClockEventManagerService manager; private final ClockEventManagerService clockEventManagerService;
private final EventBus eventBus;
private final String name; private final String name;
public ClockEventService(ClockEventManagerService manager, private final String eventType;
EventBus eventBus,
String name) { public ClockEventService(ClockEventManagerService clockEventManagerService,
this.manager = manager; String name,
this.eventBus = eventBus; String eventType) {
this.clockEventManagerService = clockEventManagerService;
this.name = name; this.name = name;
this.eventType = Objects.requireNonNull(eventType, "clock event type must not be null");
} }
@Override @Override
public Integer call() { public Boolean call() {
try { try {
if (manager.getSuspended().contains(name)) { if (clockEventManagerService.getSuspended().contains(name)) {
logger.log(Level.FINE, "clock event " + name + " suspended"); logger.log(Level.FINE, "clock event service " + name + " is suspended, unable to continue");
return 1; return false;
} else { } else {
Event clockEvent = EventManager.eventBuilder() Event clockEvent = EventManager.eventBuilder()
.setType("clock") .setType(eventType)
.build(); .build();
eventBus.post(clockEvent); clockEventManagerService.publish(clockEvent);
return 0; return true;
} }
} catch (Throwable t) { } catch (Throwable t) {
logger.log(Level.WARNING, t.getMessage(), t); logger.log(Level.WARNING, t.getMessage(), t);
return 1; return false;
} }
} }
} }

View file

@ -1,23 +0,0 @@
package org.xbib.event.clock;
import org.xbib.event.ClockEvent;
import org.xbib.event.EventConsumer;
import java.util.logging.Logger;
import org.xbib.event.bus.AllowConcurrentEvents;
import org.xbib.event.bus.Subscribe;
public class SimpleClockEventConsumer implements EventConsumer {
private static final Logger logger = Logger.getLogger(SimpleClockEventConsumer.class.getName());
public SimpleClockEventConsumer() {
}
@Subscribe
@AllowConcurrentEvents
void onEvent(ClockEvent event) {
logger.info("received demo clock event, created = " + event.getCreated());
}
}

View file

@ -0,0 +1,25 @@
package org.xbib.event.common;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
public abstract class AbstractEventManagerService {
private List<String> suspended;
public AbstractEventManagerService() {
this.suspended = new CopyOnWriteArrayList<>();
}
public void suspend(String name) {
suspended.add(name);
}
public void resume(String name) {
suspended.remove(name);
}
public List<String> getSuspended() {
return suspended;
}
}

View file

@ -3,6 +3,7 @@ package org.xbib.event.common;
import java.lang.reflect.InvocationTargetException; import java.lang.reflect.InvocationTargetException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption; import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.FileTime; import java.nio.file.attribute.FileTime;
import java.time.Instant; import java.time.Instant;
@ -11,16 +12,17 @@ import java.util.Map;
import java.util.ServiceLoader; import java.util.ServiceLoader;
import org.xbib.datastructures.json.tiny.Json; import org.xbib.datastructures.json.tiny.Json;
import org.xbib.datastructures.json.tiny.JsonBuilder;
import org.xbib.event.Event; import org.xbib.event.Event;
import org.xbib.event.EventConsumer; import org.xbib.event.EventConsumer;
import org.xbib.event.Listener; import org.xbib.event.Listener;
import org.xbib.event.Payload;
import org.xbib.event.bus.AsyncEventBus; import org.xbib.event.bus.AsyncEventBus;
import org.xbib.event.bus.EventBus; import org.xbib.event.bus.EventBus;
import org.xbib.event.bus.Subscriber;
import org.xbib.event.bus.SubscriberExceptionContext; import org.xbib.event.bus.SubscriberExceptionContext;
import org.xbib.event.bus.SubscriberExceptionHandler; import org.xbib.event.bus.SubscriberExceptionHandler;
import org.xbib.event.bus.SubscriberRegistry;
import org.xbib.event.clock.ClockEventManagerService; import org.xbib.event.clock.ClockEventManagerService;
import org.xbib.event.generic.GenericEventManagerService;
import org.xbib.event.path.FileFollowEventManagerService; import org.xbib.event.path.FileFollowEventManagerService;
import org.xbib.event.path.PathEventManagerService; import org.xbib.event.path.PathEventManagerService;
import org.xbib.event.timer.TimerEventManagerService; import org.xbib.event.timer.TimerEventManagerService;
@ -31,12 +33,15 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import org.xbib.time.schedule.Entry;
public final class EventManager { public final class EventManager extends AbstractEventManagerService implements EventManagerService {
private static final Logger logger = Logger.getLogger(EventManager.class.getName()); private static final Logger logger = Logger.getLogger(EventManager.class.getName());
@ -47,6 +52,7 @@ public final class EventManager {
private final Map<Class<? extends EventManagerService>, EventManagerService> eventManagerServices; private final Map<Class<? extends EventManagerService>, EventManagerService> eventManagerServices;
private EventManager(EventManagerBuilder builder) { private EventManager(EventManagerBuilder builder) {
super();
this.builder = builder; this.builder = builder;
eventTypes.put("null", NullEvent.class); eventTypes.put("null", NullEvent.class);
eventTypes.put("generic", GenericEventImpl.class); eventTypes.put("generic", GenericEventImpl.class);
@ -59,7 +65,7 @@ public final class EventManager {
} }
logger.log(Level.INFO, "installed events = " + eventTypes.keySet()); logger.log(Level.INFO, "installed events = " + eventTypes.keySet());
this.eventManagerServices = new HashMap<>(); this.eventManagerServices = new HashMap<>();
eventManagerServices.put(GenericEventManagerService.class, new GenericEventManagerService().init(this)); eventManagerServices.put(this.getClass(), this);
eventManagerServices.put(ClockEventManagerService.class, new ClockEventManagerService().init(this)); eventManagerServices.put(ClockEventManagerService.class, new ClockEventManagerService().init(this));
eventManagerServices.put(TimerEventManagerService.class, new TimerEventManagerService().init(this)); eventManagerServices.put(TimerEventManagerService.class, new TimerEventManagerService().init(this));
eventManagerServices.put(FileFollowEventManagerService.class, new FileFollowEventManagerService().init(this)); eventManagerServices.put(FileFollowEventManagerService.class, new FileFollowEventManagerService().init(this));
@ -70,6 +76,70 @@ public final class EventManager {
logger.log(Level.INFO, "installed event service managers = " + eventManagerServices.keySet()); logger.log(Level.INFO, "installed event service managers = " + eventManagerServices.keySet());
} }
@SuppressWarnings("unchecked")
public static <E extends Event> E eventOf(String eventType, Class<E> eventClass) {
return (E) eventBuilder()
.setType(eventType)
.build();
}
public static Event eventOf(String eventType,
String code,
String message,
Path path) {
return eventBuilder()
.setType(eventType)
.setCode(code)
.setMessage(message)
.setPath(path)
.setPayload(PayloadImpl.fromPath(path))
.build();
}
public static Event eventOf(String eventType,
Instant instant) {
return eventBuilder()
.setType(eventType)
.setInstant(instant)
.build();
}
public static Event eventFromFile(Path file) throws IOException {
return eventFromJson(Files.readString(file));
}
@SuppressWarnings("unchecked")
public static Event eventFromJson(String json) {
Map<String, Object> map = Json.toMap(json);
EventBuilder builder = eventBuilder();
if (map.containsKey("type")) {
builder.setType(map.getOrDefault("type", "generic").toString());
}
if (map.containsKey("code")) {
builder.setCode(map.getOrDefault("code", "").toString());
}
if (map.containsKey("message")) {
builder.setMessage(map.getOrDefault("message", "").toString());
}
if (map.containsKey("instant")) {
String instant = map.getOrDefault("instant", "").toString();
builder.setInstant(Instant.parse(instant));
}
if (map.containsKey("payload")) {
PayloadImpl payload = new PayloadImpl((Map<String, Object>) map.get("payload"));
builder.setPayload(payload);
}
if (map.containsKey("path")) {
Path path = Paths.get((String) map.get("path"));
builder.setPath(path);
}
return builder.build();
}
public EventManagerService init(EventManager eventManager) {
return this;
}
public static EventManagerBuilder builder() { public static EventManagerBuilder builder() {
return new EventManagerBuilder(); return new EventManagerBuilder();
} }
@ -94,19 +164,11 @@ public final class EventManager {
return builder.executorService; return builder.executorService;
} }
public void dispatch(Event event) {
getGenericEventManagerService().post(event);
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <T extends EventManagerService> T getEventManagerService(Class<T> cl) { public <T extends EventManagerService> T getEventManagerService(Class<T> cl) {
return (T) eventManagerServices.get(cl); return (T) eventManagerServices.get(cl);
} }
public GenericEventManagerService getGenericEventManagerService() {
return (GenericEventManagerService) eventManagerServices.get(GenericEventManagerService.class);
}
public ClockEventManagerService getClockEventManagerService() { public ClockEventManagerService getClockEventManagerService() {
return (ClockEventManagerService) eventManagerServices.get(ClockEventManagerService.class); return (ClockEventManagerService) eventManagerServices.get(ClockEventManagerService.class);
} }
@ -123,7 +185,24 @@ public final class EventManager {
return (PathEventManagerService) eventManagerServices.get(PathEventManagerService.class); return (PathEventManagerService) eventManagerServices.get(PathEventManagerService.class);
} }
public void close() throws IOException { public void publish(Event event) {
getEventBus().post(event);
}
public void publish(GenericEventImpl event,
CompletableFuture<Event> future) {
SubscriberRegistry subscriberRegistry = getEventBus().getSubscribers();
Set<Subscriber> set = subscriberRegistry.getSubscribersForTesting(event.getClass());
event.setListener(new WrappedListener(event.getListener(), set.size(), future));
publish(event);
}
public List<Entry<Boolean>> getCronScheduleEntries() {
ClockEventManagerService clockEventManagerService = getClockEventManagerService();
return clockEventManagerService != null ? clockEventManagerService.getCronScheduleEntries() : List.of();
}
public void shutdown() throws IOException {
for (EventConsumer eventConsumer : builder.eventConsumers) { for (EventConsumer eventConsumer : builder.eventConsumers) {
if (eventConsumer instanceof Closeable closeable) { if (eventConsumer instanceof Closeable closeable) {
closeable.close(); closeable.close();
@ -131,7 +210,9 @@ public final class EventManager {
} }
for (EventManagerService service : eventManagerServices.values()) { for (EventManagerService service : eventManagerServices.values()) {
try { try {
service.shutdown(); if (service != this) {
service.shutdown();
}
} catch (IOException e) { } catch (IOException e) {
logger.log(Level.SEVERE, e.getMessage(), e); logger.log(Level.SEVERE, e.getMessage(), e);
} }
@ -244,36 +325,31 @@ public final class EventManager {
} }
} }
public static Event eventOf(String eventType, private static class WrappedListener implements Listener {
String code,
String message,
Path path) {
return eventBuilder()
.setType(eventType)
.setCode(code)
.setMessage(message)
.setPath(path)
.build();
}
public static Event eventOf(String eventType, private final Listener listener;
Instant scheduled) {
return eventBuilder()
.setType(eventType)
.setScheduledFor(scheduled)
.build();
}
public static Event eventFromFile(Path file) throws IOException { private int size;
return eventFromJson(Files.readString(file));
}
public static Event eventFromJson(String json) { private final CompletableFuture<Event> future;
Map<String, Object> map = Json.toMap(json);
return eventBuilder() public WrappedListener(Listener listener, int size, CompletableFuture<Event> future) {
.setType(map.getOrDefault("type", "generic").toString()) this.listener = listener;
.setPayload(new Payload(map)) this.size = size;
.build(); this.future = future;
}
@Override
public void listen(Event event) {
if (listener != null) {
listener.listen(event);
} else {
logger.log(Level.WARNING, "listener not set");
}
if (--size == 0) {
future.complete(event);
}
}
} }
public static class EventImpl implements Event { public static class EventImpl implements Event {
@ -304,18 +380,13 @@ public final class EventManager {
} }
@Override @Override
public Payload getPayload() { public PayloadImpl getPayload() {
return builder.payload; return builder.payload;
} }
@Override @Override
public Instant getCreated() { public Instant getInstant() {
return builder.created; return builder.instant;
}
@Override
public Instant getScheduledFor() {
return builder.scheduled;
} }
@Override @Override
@ -338,8 +409,24 @@ public final class EventManager {
return builder.fileSize; return builder.fileSize;
} }
@Override
public String toJson() throws IOException { public String toJson() throws IOException {
return Json.toString(builder.payload); JsonBuilder builder = JsonBuilder.builder();
builder.beginMap();
builder.fieldIfNotNull("type", getType());
builder.fieldIfNotNull("code", getCode());
builder.fieldIfNotNull("message", getMessage());
if (getPayload() != null && !getPayload().isEmpty()) {
builder.buildKey("payload").buildMap(getPayload());
}
if (getInstant() != null) {
builder.fieldIfNotNull("instant", getInstant().toString());
}
if (getPath() != null) {
builder.fieldIfNotNull("path", getPath().toAbsolutePath().toString());
}
builder.endMap();
return builder.build();
} }
@Override @Override
@ -395,11 +482,9 @@ public final class EventManager {
String message; String message;
Instant scheduled; Instant instant;
Instant created; PayloadImpl payload;
Payload payload;
Path path; Path path;
@ -439,12 +524,12 @@ public final class EventManager {
return this; return this;
} }
public EventBuilder setScheduledFor(Instant scheduled) { public EventBuilder setInstant(Instant instant) {
this.scheduled = scheduled; this.instant = instant;
return this; return this;
} }
public EventBuilder setPayload(Payload payload) { public EventBuilder setPayload(PayloadImpl payload) {
this.payload = payload; this.payload = payload;
return this; return this;
} }
@ -482,7 +567,7 @@ public final class EventManager {
cl = NullEvent.class; cl = NullEvent.class;
} }
try { try {
this.created = Instant.now(); this.instant = Instant.now();
if (listener != null) { if (listener != null) {
return cl.getDeclaredConstructor(EventBuilder.class, Listener.class).newInstance(this, listener); return cl.getDeclaredConstructor(EventBuilder.class, Listener.class).newInstance(this, listener);
} else { } else {
@ -507,5 +592,4 @@ public final class EventManager {
return pos >= 0 ? name.substring(pos + 1) : null; return pos >= 0 ? name.substring(pos + 1) : null;
} }
} }
} }

View file

@ -1,28 +1,27 @@
package org.xbib.event.common; package org.xbib.event.common;
import org.xbib.event.GenericEvent;
import org.xbib.event.Listener; import org.xbib.event.Listener;
import java.util.Objects; import java.util.Objects;
public class GenericEventImpl extends EventManager.EventImpl { public class GenericEventImpl extends EventManager.EventImpl implements GenericEvent {
private final EventManager.EventBuilder builder; private final EventManager.EventBuilder builder;
public GenericEventImpl(EventManager.EventBuilder builder) {
this(builder, null);
}
public GenericEventImpl(EventManager.EventBuilder builder, Listener listener) { public GenericEventImpl(EventManager.EventBuilder builder, Listener listener) {
super(builder); super(builder);
this.builder = builder; this.builder = builder;
this.builder.listener = Objects.requireNonNull(listener); this.builder.listener = listener;
} }
@Override
public GenericEventImpl setListener(Listener listener) { public GenericEventImpl setListener(Listener listener) {
this.builder.listener = Objects.requireNonNull(listener); this.builder.listener = Objects.requireNonNull(listener);
return this; return this;
} }
@Override
public void received() { public void received() {
builder.listener.listen(this); builder.listener.listen(this);
} }

View file

@ -0,0 +1,35 @@
package org.xbib.event.common;
import org.xbib.datastructures.json.tiny.Json;
import org.xbib.event.Payload;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.LinkedHashMap;
import java.util.Map;
@SuppressWarnings("serial")
public class PayloadImpl extends LinkedHashMap<String, Object> implements Payload {
public PayloadImpl() {
super();
}
public PayloadImpl(Map<String, Object> map) {
super(map);
}
public static PayloadImpl fromPath(Path path) {
PayloadImpl payload = new PayloadImpl();
try (InputStream inputStream = Files.newInputStream(path)) {
String content = new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
payload.putAll(Json.toMap(content));
} catch (IOException e) {
throw new IllegalArgumentException("broken json content in path " + path);
}
return payload;
}
}

View file

@ -1,77 +0,0 @@
package org.xbib.event.generic;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.xbib.event.Event;
import org.xbib.event.Listener;
import org.xbib.event.bus.EventBus;
import org.xbib.event.bus.Subscriber;
import org.xbib.event.bus.SubscriberRegistry;
import org.xbib.event.common.EventManager;
import org.xbib.event.common.EventManagerService;
import org.xbib.event.common.GenericEventImpl;
public class GenericEventManagerService implements EventManagerService {
private static final Logger logger = Logger.getLogger(GenericEventManagerService.class.getName());
private EventBus eventBus;
public GenericEventManagerService() {
}
@Override
public GenericEventManagerService init(EventManager eventManager) {
this.eventBus = eventManager.getEventBus();
return this;
}
@Override
public void shutdown() throws IOException {
}
public void post(Object event) {
eventBus.post(event);
}
public void post(GenericEventImpl event,
CompletableFuture<Event> future) {
SubscriberRegistry subscriberRegistry = eventBus.getSubscribers();
Set<Subscriber> set = subscriberRegistry.getSubscribersForTesting(event.getClass());
logger.log(Level.INFO, "set = " + set);
event.setListener(new WrappedListener(event.getListener(), set.size(), future));
post(event);
}
static class WrappedListener implements Listener {
private final Listener listener;
private int size;
private final CompletableFuture<Event> future;
public WrappedListener(Listener listener, int size, CompletableFuture<Event> future) {
this.listener = listener;
this.size = size;
this.future = future;
}
@Override
public void listen(Event event) {
if (listener != null) {
listener.listen(event);
} else {
logger.log(Level.WARNING, "listener not set");
}
if (--size == 0) {
future.complete(event);
}
}
}
}

View file

@ -1,6 +1,7 @@
package org.xbib.event.path; package org.xbib.event.path;
import org.xbib.event.bus.EventBus; import org.xbib.event.Event;
import org.xbib.event.common.AbstractEventManagerService;
import org.xbib.event.common.EventManager; import org.xbib.event.common.EventManager;
import org.xbib.event.common.EventManagerService; import org.xbib.event.common.EventManagerService;
import org.xbib.settings.Settings; import org.xbib.settings.Settings;
@ -17,24 +18,28 @@ import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import java.util.regex.Pattern; import java.util.regex.Pattern;
public class FileFollowEventManagerService implements EventManagerService { public class FileFollowEventManagerService extends AbstractEventManagerService implements EventManagerService {
private static final Logger logger = Logger.getLogger(FileFollowEventManagerService.class.getName()); private static final Logger logger = Logger.getLogger(FileFollowEventManagerService.class.getName());
private Map<Future<?>, FileFollowEventService> eventServiceMap; private EventManager eventManager;
private final Map<Future<?>, FileFollowEventService> services;
public FileFollowEventManagerService() { public FileFollowEventManagerService() {
super();
this.services = new LinkedHashMap<>();
} }
@Override @Override
public FileFollowEventManagerService init(EventManager eventManager) { public FileFollowEventManagerService init(EventManager eventManager) {
Settings settings = eventManager.getSettings(); this.eventManager = eventManager;
EventBus eventBus = eventManager.getEventBus();
ExecutorService executorService = eventManager.getExecutorService(); ExecutorService executorService = eventManager.getExecutorService();
this.eventServiceMap = new LinkedHashMap<>(); for (Map.Entry<String, Settings> entry : eventManager.getSettings().getGroups("event.filefollow").entrySet()) {
for (Map.Entry<String, Settings> entry : settings.getGroups("event.filefollow").entrySet()) { String name = entry.getKey();
Settings definition = entry.getValue(); Settings definition = entry.getValue();
if (definition.getAsBoolean("enabled", true)) { if (definition.getAsBoolean("enabled", true)) {
String type = definition.get("type", "filefollow");
String baseStr = definition.get("base"); String baseStr = definition.get("base");
Objects.requireNonNull(baseStr); Objects.requireNonNull(baseStr);
String patternStr = definition.get("pattern"); String patternStr = definition.get("pattern");
@ -42,21 +47,25 @@ public class FileFollowEventManagerService implements EventManagerService {
try { try {
Path base = Paths.get(baseStr); Path base = Paths.get(baseStr);
Pattern pattern = Pattern.compile(patternStr); Pattern pattern = Pattern.compile(patternStr);
FileFollowEventService fileFollowEventService = new FileFollowEventService(definition, eventBus, base, pattern); FileFollowEventService fileFollowEventService = new FileFollowEventService(this,
type, name, base, pattern);
Future<?> future = executorService.submit(fileFollowEventService); Future<?> future = executorService.submit(fileFollowEventService);
eventServiceMap.put(future, fileFollowEventService); services.put(future, fileFollowEventService);
logger.log(Level.INFO, "file follow service " + entry.getKey() + " with base " + base + " and pattern " + pattern + " added"); logger.log(Level.INFO, "file follow service " + name +
" with type " + type +
" with base " + base + " and pattern " + pattern + " added");
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.SEVERE, "unable to create file follow service " + entry.getKey() + ", reason " + e.getMessage(), e); logger.log(Level.SEVERE, "unable to create file follow service " +name + ", reason " + e.getMessage(), e);
} }
} }
} }
logger.log(Level.INFO, "after init: event services = " + services);
return this; return this;
} }
@Override @Override
public void shutdown() throws IOException { public void shutdown() throws IOException {
for (Map.Entry<Future<?>, FileFollowEventService> entry : eventServiceMap.entrySet()) { for (Map.Entry<Future<?>, FileFollowEventService> entry : services.entrySet()) {
try { try {
entry.getValue().setKeepWatching(false); entry.getValue().setKeepWatching(false);
entry.getKey().cancel(true); entry.getKey().cancel(true);
@ -65,4 +74,8 @@ public class FileFollowEventManagerService implements EventManagerService {
} }
} }
} }
public void publish(Event event) {
eventManager.publish(event);
}
} }

View file

@ -1,9 +1,7 @@
package org.xbib.event.path; package org.xbib.event.path;
import org.xbib.event.Event; import org.xbib.event.Event;
import org.xbib.event.bus.EventBus;
import org.xbib.event.common.EventManager; import org.xbib.event.common.EventManager;
import org.xbib.settings.Settings;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
@ -33,7 +31,11 @@ public class FileFollowEventService implements Callable<Integer>, Closeable {
private static final Logger logger = Logger.getLogger(FileFollowEventService.class.getName()); private static final Logger logger = Logger.getLogger(FileFollowEventService.class.getName());
private final EventBus eventBus; private final FileFollowEventManagerService fileFollowEventManagerService;
private final String eventType;
private final String name;
private final Path base; private final Path base;
@ -45,11 +47,14 @@ public class FileFollowEventService implements Callable<Integer>, Closeable {
private volatile boolean keepWatching; private volatile boolean keepWatching;
public FileFollowEventService(Settings settings, public FileFollowEventService(FileFollowEventManagerService fileFollowEventManagerService,
EventBus eventBus, String eventType,
String name,
Path base, Path base,
Pattern pattern) throws IOException { Pattern pattern) throws IOException {
this.eventBus = eventBus; this.fileFollowEventManagerService = fileFollowEventManagerService;
this.eventType = eventType;
this.name = name;
this.base = base; this.base = base;
this.pattern = pattern; this.pattern = pattern;
FileSystem fileSystem = base.getFileSystem(); FileSystem fileSystem = base.getFileSystem();
@ -58,7 +63,8 @@ public class FileFollowEventService implements Callable<Integer>, Closeable {
kinds[0] = StandardWatchEventKinds.ENTRY_MODIFY; kinds[0] = StandardWatchEventKinds.ENTRY_MODIFY;
base.register(watchService, kinds); base.register(watchService, kinds);
this.fileSizes = new LinkedHashMap<>(); this.fileSizes = new LinkedHashMap<>();
fillFileSizes(base, pattern); // initialize the sizes of existing files, so we can safely create first event
setFileSizes(base, pattern);
this.keepWatching = true; this.keepWatching = true;
} }
@ -87,15 +93,20 @@ public class FileFollowEventService implements Callable<Integer>, Closeable {
long currentSize = p.toFile().length(); long currentSize = p.toFile().length();
fileSizes.put(p, currentSize); fileSizes.put(p, currentSize);
String content = readRange(channel, lastSize, currentSize); String content = readRange(channel, lastSize, currentSize);
// split content by line, this allows pattern matching without preprocessing in worker // split file content by line
// this prevents swalloed events and allows pattern matching without preprocessing in worker
for (String line : content.split("\n")) { for (String line : content.split("\n")) {
Event event = EventManager.eventBuilder() if (fileFollowEventManagerService.getSuspended().contains(name)) {
.setType("filefollow") logger.log(Level.WARNING, name + " is suspended");
.setCode(base.toString()) } else {
.setPath(path) Event event = EventManager.eventBuilder()
.setMessage(line) .setType(eventType)
.build(); .setCode(base.toString())
eventBus.post(event); .setPath(path)
.setMessage(line)
.build();
fileFollowEventManagerService.publish(event);
}
} }
} }
} }
@ -134,7 +145,7 @@ public class FileFollowEventService implements Callable<Integer>, Closeable {
return charBuffer.toString(); return charBuffer.toString();
} }
private void fillFileSizes(Path base, Pattern pattern) throws IOException { private void setFileSizes(Path base, Pattern pattern) throws IOException {
if (!Files.exists(base)) { if (!Files.exists(base)) {
return; return;
} }

View file

@ -3,7 +3,7 @@ package org.xbib.event.path;
import org.xbib.datastructures.api.TimeValue; import org.xbib.datastructures.api.TimeValue;
import org.xbib.datastructures.json.tiny.Json; import org.xbib.datastructures.json.tiny.Json;
import org.xbib.event.Event; import org.xbib.event.Event;
import org.xbib.event.bus.EventBus; import org.xbib.event.common.AbstractEventManagerService;
import org.xbib.event.common.EventManager; import org.xbib.event.common.EventManager;
import org.xbib.event.common.EventManagerService; import org.xbib.event.common.EventManagerService;
import org.xbib.settings.Settings; import org.xbib.settings.Settings;
@ -14,73 +14,102 @@ import java.io.Writer;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.ArrayList; import java.util.Collection;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import java.util.stream.Stream; import java.util.stream.Stream;
public class PathEventManagerService implements EventManagerService { public class PathEventManagerService extends AbstractEventManagerService implements EventManagerService {
private static final Logger logger = Logger.getLogger(PathEventManagerService.class.getName()); private static final Logger logger = Logger.getLogger(PathEventManagerService.class.getName());
private EventBus eventBus; private EventManager eventManager;
private ExecutorService executorService; private ExecutorService executorService;
private Map<Future<?>, PathEventService> eventServiceMap; private final Map<Future<?>, PathEventService> services;
private List<String> suspendedQueues;
public PathEventManagerService() { public PathEventManagerService() {
super();
this.services = new LinkedHashMap<>();
} }
@Override @Override
public PathEventManagerService init(EventManager eventManager) { public PathEventManagerService init(EventManager eventManager) {
Settings settings = eventManager.getSettings(); this.eventManager = eventManager;
this.eventBus = eventManager.getEventBus();
this.executorService = eventManager.getExecutorService(); this.executorService = eventManager.getExecutorService();
this.eventServiceMap = new LinkedHashMap<>(); for (Map.Entry<String, Settings> entry : eventManager.getSettings().getGroups("event.path").entrySet()) {
this.suspendedQueues = new ArrayList<>();
for (Map.Entry<String, Settings> entry : settings.getGroups("event.path").entrySet()) {
try { try {
String name = entry.getKey(); String name = entry.getKey();
Settings definition = entry.getValue(); Settings definition = entry.getValue();
if (definition.getAsBoolean("enabled", true)) { if (definition.getAsBoolean("enabled", true)) {
Path path = Paths.get(definition.get("path", "/var/tmp/" + name)); String pathName = Objects.requireNonNull(definition.get("path"), "path must not be null");
Path path = Paths.get(pathName);
TimeValue lifetime = definition.getAsTime("lifetime", TimeValue.timeValueHours(72)); TimeValue lifetime = definition.getAsTime("lifetime", TimeValue.timeValueHours(72));
String eventType = definition.get("type", "path"); String eventType = definition.get("type", "path");
createPathEventService(name, path, eventType, lifetime); createQueue(name, path);
PathEventService pathEventService = new PathEventService(this, name, path, eventType, lifetime);
add(pathEventService);
pathEventService.drainIncoming();
} else { } else {
logger.log(Level.WARNING, "path servive definition not enabled in configuration"); logger.log(Level.WARNING, "path service definition not enabled in configuration");
} }
} catch (Exception e) { } catch (IOException e) {
logger.log(Level.SEVERE, "unable to create event path service " + entry.getKey() + ", reason " + e.getMessage(), e); logger.log(Level.SEVERE, "unable to create event path service " + entry.getKey() + ", reason " + e.getMessage(), e);
} }
} }
logger.log(Level.INFO, "after init: event services = " + services);
return this; return this;
} }
public void createPathEventService(String name, Path path, String eventType, TimeValue lifetime) throws IOException { public void add(String name, Path path, String eventType) throws IOException{
createQueue(name, path); createQueue(name, path);
PathEventService pathEventService = new PathEventService(this, eventBus, name, path, eventType, lifetime); add(new PathEventService(this, name, path, eventType, TimeValue.timeValueHours(72)));
add(pathEventService); }
public void add(String name, Path path, String eventType, TimeValue lifetime) throws IOException{
createQueue(name, path);
add(new PathEventService(this, name, path, eventType, lifetime));
} }
public void add(PathEventService pathEventService) { public void add(PathEventService pathEventService) {
Future<?> future = executorService.submit(pathEventService); Future<?> future = executorService.submit(pathEventService);
eventServiceMap.put(future, pathEventService); services.put(future, pathEventService);
logger.log(Level.INFO, "path event service " + pathEventService + " added"); logger.log(Level.INFO, "path event service " + pathEventService + " added for path " + pathEventService.getPath());
}
public Collection<PathEventService> getPathEventServices() {
return services.values();
}
public PathEventService getPathEventService(String name) {
return services.values().stream().filter(p -> p.getName().equals(name)).findFirst().orElse(null);
}
public void publish(String eventType, Path path) {
Event event = EventManager.eventOf(eventType, null, null, path);
eventManager.publish(event);
}
public void failEvent(String eventType, Path path) {
try {
Event event = EventManager.eventOf(eventType, null, null, path);
event.fail();
} catch (IOException e) {
logger.log(Level.SEVERE, "unable to fail event because of " + e.getMessage(), e);
}
} }
@Override @Override
public void shutdown() throws IOException { public void shutdown() throws IOException {
logger.log(Level.INFO, "shut down all path event services"); logger.log(Level.INFO, "shutting down all path event services");
eventServiceMap.forEach((k, v) -> { services.forEach((k, v) -> {
k.cancel(true); k.cancel(true);
try { try {
v.close(); v.close();
@ -92,7 +121,7 @@ public class PathEventManagerService implements EventManagerService {
public void destroy() { public void destroy() {
logger.log(Level.INFO, "shutting down and destroying all path event files"); logger.log(Level.INFO, "shutting down and destroying all path event files");
eventServiceMap.forEach((k, v) -> { services.forEach((k, v) -> {
k.cancel(true); k.cancel(true);
try { try {
v.close(); v.close();
@ -103,31 +132,28 @@ public class PathEventManagerService implements EventManagerService {
}); });
} }
public List<String> getSuspendedQueues() { public boolean publishJsonIfNotExists(PathEventService pathEventService,
return suspendedQueues; String key,
} Map<String,Object> map) throws IOException {
Path path = pathEventService.getPath();
public void suspend(String queue) {
suspendedQueues.add(queue);
}
public void resume(String queue) {
suspendedQueues.remove(queue);
}
public boolean put(Path path, String key, Map<String,Object> map) throws IOException {
return put(path, key, ".json", Json.toString(map));
}
public boolean putIfNotExists(Path path, String key, Map<String,Object> map) throws IOException {
if (!exists(path, key, ".json")) { if (!exists(path, key, ".json")) {
return put(path, key, ".json", Json.toString(map)); return publishJson(pathEventService, key, map);
} else { } else {
return false; return false;
} }
} }
public boolean put(Path path, String key, String suffix, String string) throws IOException { public boolean publishJson(PathEventService pathEventService,
String key,
Map<String,Object> map) throws IOException {
return publishJson(pathEventService, key, ".json", Json.toString(map));
}
public boolean publishJson(PathEventService pathEventService,
String key,
String suffix,
String payload) throws IOException {
Path path = pathEventService.getPath();
String keyFileName = key + suffix; String keyFileName = key + suffix;
if (Files.exists(path.resolve(Event.INCOMING).resolve(keyFileName)) || if (Files.exists(path.resolve(Event.INCOMING).resolve(keyFileName)) ||
Files.exists(path.resolve(Event.SUCCESS).resolve(keyFileName))) { Files.exists(path.resolve(Event.SUCCESS).resolve(keyFileName))) {
@ -136,10 +162,10 @@ public class PathEventManagerService implements EventManagerService {
} }
Path p = path.resolve(Event.INCOMING).resolve(keyFileName); Path p = path.resolve(Event.INCOMING).resolve(keyFileName);
try (Writer writer = Files.newBufferedWriter(p)) { try (Writer writer = Files.newBufferedWriter(p)) {
writer.write(string); writer.write(payload);
} }
// obligatory purge. This is hacky. // obligatory purge. This is hacky.
eventServiceMap.forEach((k, v) -> { services.forEach((k, v) -> {
try { try {
v.purge(); v.purge();
} catch (IOException e) { } catch (IOException e) {
@ -149,12 +175,6 @@ public class PathEventManagerService implements EventManagerService {
return true; return true;
} }
public boolean exists(Path path, String key, String suffix) {
String keyFileName = key + suffix;
return Files.exists(path.resolve(Event.INCOMING).resolve(keyFileName)) ||
Files.exists(path.resolve(Event.SUCCESS).resolve(keyFileName));
}
public long sizeOfIncoming(Path path) throws IOException { public long sizeOfIncoming(Path path) throws IOException {
return sizeOf(path.resolve(Event.INCOMING)); return sizeOf(path.resolve(Event.INCOMING));
} }
@ -173,13 +193,19 @@ public class PathEventManagerService implements EventManagerService {
} }
} }
private static void createQueue(String name, Path p) throws IOException { private static boolean exists(Path path, String key, String suffix) {
logger.log(Level.FINE, "creating queue " + name + " at " + p); String keyFileName = key + suffix;
if (!Files.exists(p)) { return Files.exists(path.resolve(Event.INCOMING).resolve(keyFileName)) ||
Files.createDirectories(p); Files.exists(path.resolve(Event.SUCCESS).resolve(keyFileName));
}
private static void createQueue(String name, Path path) throws IOException {
logger.log(Level.FINE, "creating queue " + name + " at " + path);
if (!Files.exists(path)) {
Files.createDirectories(path);
} }
for (String s: List.of(Event.INCOMING, Event.SUCCESS, Event.FAIL)) { for (String s: List.of(Event.INCOMING, Event.SUCCESS, Event.FAIL)) {
Path dir = p.resolve(s); Path dir = path.resolve(s);
if (!Files.exists(dir)) { if (!Files.exists(dir)) {
logger.log(Level.FINE, "creating queue " + name + " dir " + dir); logger.log(Level.FINE, "creating queue " + name + " dir " + dir);
Files.createDirectories(dir); Files.createDirectories(dir);

View file

@ -4,8 +4,6 @@ import java.nio.file.FileVisitResult;
import java.nio.file.SimpleFileVisitor; import java.nio.file.SimpleFileVisitor;
import org.xbib.datastructures.api.TimeValue; import org.xbib.datastructures.api.TimeValue;
import org.xbib.event.Event; import org.xbib.event.Event;
import org.xbib.event.bus.EventBus;
import org.xbib.event.common.EventManager;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
@ -21,18 +19,17 @@ import java.nio.file.WatchService;
import java.nio.file.attribute.BasicFileAttributeView; import java.nio.file.attribute.BasicFileAttributeView;
import java.nio.file.attribute.BasicFileAttributes; import java.nio.file.attribute.BasicFileAttributes;
import java.time.Instant; import java.time.Instant;
import java.util.Objects;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
public class PathEventService implements Callable<Integer>, Closeable { public class PathEventService implements Callable<Boolean>, Closeable {
private static final Logger logger = Logger.getLogger(PathEventService.class.getName()); private static final Logger logger = Logger.getLogger(PathEventService.class.getName());
private final PathEventManagerService pathEventManager; private final PathEventManagerService pathEventManager;
private final EventBus eventBus;
private final Path path; private final Path path;
private final String name; private final String name;
@ -43,20 +40,16 @@ public class PathEventService implements Callable<Integer>, Closeable {
private final WatchService watchService; private final WatchService watchService;
private int eventCount;
private volatile boolean keepWatching; private volatile boolean keepWatching;
public PathEventService(PathEventManagerService pathEventManager, public PathEventService(PathEventManagerService pathEventManager,
EventBus eventBus,
String name, String name,
Path path, Path path,
String eventType, String eventType,
TimeValue lifetime) throws IOException { TimeValue lifetime) throws IOException {
this.pathEventManager = pathEventManager; this.pathEventManager = pathEventManager;
this.eventBus = eventBus;
this.name = name; this.name = name;
this.path = path; this.path = Objects.requireNonNull(path, "path must not be null");
this.eventType = eventType; this.eventType = eventType;
this.lifetime = lifetime; this.lifetime = lifetime;
this.watchService = path.getFileSystem().newWatchService(); this.watchService = path.getFileSystem().newWatchService();
@ -67,9 +60,17 @@ public class PathEventService implements Callable<Integer>, Closeable {
logger.log(Level.INFO, "path event service created for files at " + path); logger.log(Level.INFO, "path event service created for files at " + path);
} }
public String getName() {
return name;
}
public Path getPath() {
return path;
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public Integer call() { public Boolean call() {
try { try {
logger.log(Level.INFO, "watch service running on " + path.resolve(Event.INCOMING)); logger.log(Level.INFO, "watch service running on " + path.resolve(Event.INCOMING));
while (keepWatching && watchService != null) { while (keepWatching && watchService != null) {
@ -86,10 +87,10 @@ public class PathEventService implements Callable<Integer>, Closeable {
String watchEventContext = pathWatchEvent.context().toString(); String watchEventContext = pathWatchEvent.context().toString();
Path p = path.resolve(Event.INCOMING).resolve(watchEventContext); Path p = path.resolve(Event.INCOMING).resolve(watchEventContext);
logger.log(Level.FINE, "watch event " + pathWatchEvent + " context = " + watchEventContext + " path = " + p); logger.log(Level.FINE, "watch event " + pathWatchEvent + " context = " + watchEventContext + " path = " + p);
if (pathEventManager.getSuspendedQueues().contains(name)) { if (pathEventManager.getSuspended().contains(name)) {
failEvent(p); pathEventManager.failEvent(eventType, p);
} else { } else {
postEvent(p); pathEventManager.publish(eventType, p);
} }
} }
watchKey.reset(); watchKey.reset();
@ -102,7 +103,7 @@ public class PathEventService implements Callable<Integer>, Closeable {
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.SEVERE, e.getMessage(), e); logger.log(Level.SEVERE, e.getMessage(), e);
} }
return eventCount; return true;
} }
@Override @Override
@ -120,7 +121,7 @@ public class PathEventService implements Callable<Integer>, Closeable {
public void drainIncoming() throws IOException { public void drainIncoming() throws IOException {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path.resolve(Event.INCOMING))) { try (DirectoryStream<Path> stream = Files.newDirectoryStream(path.resolve(Event.INCOMING))) {
stream.forEach(this::postEvent); stream.forEach(p -> pathEventManager.publish(eventType, p));
} }
} }
@ -149,21 +150,6 @@ public class PathEventService implements Callable<Integer>, Closeable {
} }
} }
private void postEvent(Path path) {
Event event = EventManager.eventOf(eventType, null, null, path);
eventBus.post(event);
eventCount++;
}
private void failEvent(Path file) {
try {
Event event = EventManager.eventFromFile(file);
event.fail();
} catch (IOException e) {
logger.log(Level.SEVERE, "unable to fail event because of " + e.getMessage(), e);
}
}
private static void delete(Path path) { private static void delete(Path path) {
if (path == null) { if (path == null) {
return; return;

View file

@ -1,6 +1,6 @@
package org.xbib.event.persistence; package org.xbib.event.persistence;
import org.xbib.net.util.ExceptionFormatter; import org.xbib.event.util.ExceptionFormatter;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;

View file

@ -1,7 +1,8 @@
package org.xbib.event.timer; package org.xbib.event.timer;
import org.xbib.event.Payload; import org.xbib.event.Event;
import org.xbib.event.bus.EventBus; import org.xbib.event.common.PayloadImpl;
import org.xbib.event.common.AbstractEventManagerService;
import org.xbib.event.common.EventManager; import org.xbib.event.common.EventManager;
import org.xbib.event.common.EventManagerService; import org.xbib.event.common.EventManagerService;
import org.xbib.event.persistence.FilePersistenceStore; import org.xbib.event.persistence.FilePersistenceStore;
@ -20,38 +21,53 @@ import java.util.Map;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
public class TimerEventManagerService implements EventManagerService { public class TimerEventManagerService extends AbstractEventManagerService implements EventManagerService {
private static final Logger logger = Logger.getLogger(TimerEventManagerService.class.getName()); private static final Logger logger = Logger.getLogger(TimerEventManagerService.class.getName());
private Map<String, TimerEventService> services; private EventManager eventManager;
private final Map<String, TimerEventService> services;
public TimerEventManagerService() { public TimerEventManagerService() {
super();
this.services = new LinkedHashMap<>();
} }
@Override @Override
public TimerEventManagerService init(EventManager eventManager) { public TimerEventManagerService init(EventManager eventManager) {
Settings settings = eventManager.getSettings(); this.eventManager = eventManager;
EventBus eventBus = eventManager.getEventBus(); for (Map.Entry<String, Settings> entry : eventManager.getSettings().getGroups("event.timer").entrySet()) {
this.services = new LinkedHashMap<>();
for (Map.Entry<String, Settings> entry : settings.getGroups("event.timer").entrySet()) {
String name = entry.getKey(); String name = entry.getKey();
Settings timerSettings = entry.getValue(); Settings timerSettings = entry.getValue();
try { try {
PersistenceStore<String, Object> persistenceStore = new FilePersistenceStore(timerSettings, name); PersistenceStore<String, Object> persistenceStore = new FilePersistenceStore(timerSettings, name);
TimerEventService timerEventService = new TimerEventService(eventBus, name, ZoneId.systemDefault(), persistenceStore); TimerEventService timerEventService = new TimerEventService(this, name, ZoneId.systemDefault(), persistenceStore);
services.put(name, timerEventService); services.put(name, timerEventService);
logger.log(Level.INFO, "timer " + name + " active: " + timerEventService); logger.log(Level.INFO, "timer " + name + " active: " + timerEventService);
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.WARNING, "unable to activate timer " + name + ", reason: " + e.getMessage(), e); logger.log(Level.WARNING, "unable to activate timer " + name + ", reason: " + e.getMessage(), e);
} }
} }
logger.log(Level.INFO, "after init: event services = " + services);
return this; return this;
} }
public boolean publish(String name, @Override
String timeSpec, public void shutdown() throws IOException {
Payload payload) throws ParseException, IOException { for (Map.Entry<String, TimerEventService> entry : services.entrySet()) {
logger.log(Level.INFO, "closing timer " + entry.getKey());
entry.getValue().close();
}
}
public void publish(Event event) {
eventManager.publish(event);
}
public boolean schedule(String name,
String timeSpec,
PayloadImpl payload) throws ParseException, IOException {
if (services.containsKey(name)) { if (services.containsKey(name)) {
Span span = Chronic.parse(timeSpec); Span span = Chronic.parse(timeSpec);
if (span != null) { if (span != null) {
@ -69,9 +85,9 @@ public class TimerEventManagerService implements EventManagerService {
return false; return false;
} }
public boolean publish(String service, public boolean schedule(String service,
Instant instant, Instant instant,
Payload payload) throws IOException { PayloadImpl payload) throws IOException {
if (services.containsKey(service)) { if (services.containsKey(service)) {
services.get(service).schedule(instant, payload); services.get(service).schedule(instant, payload);
return true; return true;
@ -87,12 +103,4 @@ public class TimerEventManagerService implements EventManagerService {
entry.getValue().purge(); entry.getValue().purge();
} }
} }
@Override
public void shutdown() throws IOException {
for (Map.Entry<String, TimerEventService> entry : services.entrySet()) {
logger.log(Level.INFO, "closing timer " + entry.getKey());
entry.getValue().close();
}
}
} }

View file

@ -1,8 +1,7 @@
package org.xbib.event.timer; package org.xbib.event.timer;
import org.xbib.event.Event; import org.xbib.event.Event;
import org.xbib.event.Payload; import org.xbib.event.common.PayloadImpl;
import org.xbib.event.bus.EventBus;
import org.xbib.event.common.EventManager; import org.xbib.event.common.EventManager;
import org.xbib.event.persistence.PersistenceStore; import org.xbib.event.persistence.PersistenceStore;
@ -26,7 +25,9 @@ class TimerEventService implements Closeable {
private static final Logger logger = Logger.getLogger(TimerEventService.class.getName()); private static final Logger logger = Logger.getLogger(TimerEventService.class.getName());
private final EventBus eventBus; private final TimerEventManagerService timerEventManagerService;
private final String name;
private final ZoneId zoneId; private final ZoneId zoneId;
@ -34,11 +35,12 @@ class TimerEventService implements Closeable {
private final Timer timer; private final Timer timer;
public TimerEventService(EventBus eventBus, public TimerEventService(TimerEventManagerService timerEventManagerService,
String name, String name,
ZoneId zoneId, ZoneId zoneId,
PersistenceStore<String, Object> persistenceStore) throws IOException { PersistenceStore<String, Object> persistenceStore) throws IOException {
this.eventBus = eventBus; this.timerEventManagerService = timerEventManagerService;
this.name = name;
this.zoneId = zoneId; this.zoneId = zoneId;
this.persistenceStore = persistenceStore; this.persistenceStore = persistenceStore;
this.timer = new Timer(); this.timer = new Timer();
@ -46,7 +48,7 @@ class TimerEventService implements Closeable {
logger.log(Level.INFO, "timer event service " + name + " loaded and rescheduled, " + tasknum + " timer tasks"); logger.log(Level.INFO, "timer event service " + name + " loaded and rescheduled, " + tasknum + " timer tasks");
} }
void schedule(Instant instant, Payload payload) throws IOException { void schedule(Instant instant, PayloadImpl payload) throws IOException {
String scheduled = instant.atZone(zoneId).format(DateTimeFormatter.ISO_DATE_TIME); String scheduled = instant.atZone(zoneId).format(DateTimeFormatter.ISO_DATE_TIME);
payload.put("scheduled", scheduled); payload.put("scheduled", scheduled);
TimerEventTask timerEventTask = new TimerEventTask(payload); TimerEventTask timerEventTask = new TimerEventTask(payload);
@ -62,7 +64,7 @@ class TimerEventService implements Closeable {
persistenceStore.clear(); persistenceStore.clear();
persistenceStore.commit(); persistenceStore.commit();
for (Map<String, Object> task : tasks) { for (Map<String, Object> task : tasks) {
Payload payload = new Payload(task); PayloadImpl payload = new PayloadImpl(task);
ZonedDateTime scheduledDate = ZonedDateTime.parse((String) task.get("scheduled"), DateTimeFormatter.ISO_DATE_TIME); ZonedDateTime scheduledDate = ZonedDateTime.parse((String) task.get("scheduled"), DateTimeFormatter.ISO_DATE_TIME);
if (scheduledDate.isBefore(ZonedDateTime.now())) { if (scheduledDate.isBefore(ZonedDateTime.now())) {
logger.log(Level.WARNING, "scheduled timer task " + task + " date already passed"); logger.log(Level.WARNING, "scheduled timer task " + task + " date already passed");
@ -87,9 +89,9 @@ class TimerEventService implements Closeable {
public class TimerEventTask extends TimerTask { public class TimerEventTask extends TimerTask {
private final Payload payload; private final PayloadImpl payload;
public TimerEventTask(Payload payload) throws IOException { public TimerEventTask(PayloadImpl payload) throws IOException {
this.payload = payload; this.payload = payload;
persistenceStore.insert("tasks", this.payload); persistenceStore.insert("tasks", this.payload);
} }
@ -97,12 +99,16 @@ class TimerEventService implements Closeable {
@Override @Override
public void run() { public void run() {
try { try {
if (timerEventManagerService.getSuspended().contains(name)) {
logger.log(Level.FINE, "timer event " + name + " suspended");
return;
}
Event timerEvent = EventManager.eventBuilder() Event timerEvent = EventManager.eventBuilder()
.setType("timer") .setType("timer")
.setCode(name)
.setPayload(payload) .setPayload(payload)
.build(); .build();
logger.log(Level.FINE, "posting timer event " + timerEvent.getClass().getName() + " with payload = " + payload); timerEventManagerService.publish(timerEvent);
eventBus.post(timerEvent);
logger.log(Level.FINE, "persistence before remove: " + persistenceStore.get("tasks")); logger.log(Level.FINE, "persistence before remove: " + persistenceStore.get("tasks"));
if (persistenceStore.remove("tasks", this.payload)) { if (persistenceStore.remove("tasks", this.payload)) {
logger.log(Level.FINE, "removal done"); logger.log(Level.FINE, "removal done");

View file

@ -0,0 +1,56 @@
package org.xbib.event.util;
import java.io.PrintWriter;
import java.io.StringWriter;
/**
* Format exception messages and stack traces.
*/
public final class ExceptionFormatter {
private ExceptionFormatter() {
}
/**
* Format exception with stack trace.
*
* @param t the thrown object
* @return the formatted exception
*/
public static String format(Throwable t) {
StringBuilder sb = new StringBuilder();
append(sb, t, 0, true);
return sb.toString();
}
/**
* Append Exception to string builder.
*/
private static void append(StringBuilder sb, Throwable t, int level, boolean details) {
if (((t != null) && (t.getMessage() != null)) && (!t.getMessage().isEmpty())) {
if (details && (level > 0)) {
sb.append("\n\nCaused by\n");
}
sb.append(t.getMessage());
}
if (details) {
if (t != null) {
if ((t.getMessage() != null) && (t.getMessage().isEmpty())) {
sb.append("\n\nCaused by ");
} else {
sb.append("\n\n");
}
}
StringWriter sw = new StringWriter();
if (t != null) {
t.printStackTrace(new PrintWriter(sw));
}
sb.append(sw.toString());
}
if (t != null) {
if (t.getCause() != null) {
append(sb, t.getCause(), level + 1, details);
}
}
}
}

View file

@ -13,13 +13,15 @@ public class ClockEventManagerTest {
TestClockEventConsumer clockEventConsumer = new TestClockEventConsumer(); TestClockEventConsumer clockEventConsumer = new TestClockEventConsumer();
Settings settings = Settings.settingsBuilder() Settings settings = Settings.settingsBuilder()
.put("event.clock.testclockevent.enabled", "true") .put("event.clock.testclockevent.enabled", "true")
.put("event.clock.testclockevent.type", "test-clock-event")
.put("event.clock.testclockevent.entry", "*/1 6-21 * * *") .put("event.clock.testclockevent.entry", "*/1 6-21 * * *")
.build(); .build();
EventManager eventManager = EventManager.builder() EventManager eventManager = EventManager.builder()
.setSettings(settings) .setSettings(settings)
.register("test-clock-event", TestClockEvent.class)
.register(clockEventConsumer) .register(clockEventConsumer)
.build(); .build();
Thread.sleep(90000L); Thread.sleep(90000L);
eventManager.close(); eventManager.shutdown();
} }
} }

View file

@ -0,0 +1,11 @@
package org.xbib.event.clock;
import org.xbib.event.common.ClockEventImpl;
import org.xbib.event.common.EventManager;
public class TestClockEvent extends ClockEventImpl {
public TestClockEvent(EventManager.EventBuilder builder) {
super(builder);
}
}

View file

@ -1,6 +1,5 @@
package org.xbib.event.clock; package org.xbib.event.clock;
import org.xbib.event.ClockEvent;
import org.xbib.event.EventConsumer; import org.xbib.event.EventConsumer;
import org.xbib.event.bus.AllowConcurrentEvents; import org.xbib.event.bus.AllowConcurrentEvents;
import org.xbib.event.bus.Subscribe; import org.xbib.event.bus.Subscribe;
@ -15,7 +14,8 @@ public class TestClockEventConsumer implements EventConsumer {
@Subscribe @Subscribe
@AllowConcurrentEvents @AllowConcurrentEvents
void onEvent(ClockEvent event) { void onEvent(TestClockEvent event) {
logger.log(Level.INFO, "received clock event on " + Instant.now() + " event instant = " + event.getCreated()); logger.log(Level.INFO, "received test clock event on " + Instant.now() +
" event instant = " + event.getInstant());
} }
} }

View file

@ -1,11 +1,9 @@
package org.xbib.event.generic; package org.xbib.event.common;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.xbib.event.Event; import org.xbib.event.Event;
import org.xbib.event.EventConsumer; import org.xbib.event.EventConsumer;
import org.xbib.event.bus.Subscribe; import org.xbib.event.bus.Subscribe;
import org.xbib.event.common.EventManager;
import org.xbib.event.common.GenericEventImpl;
import org.xbib.settings.Settings; import org.xbib.settings.Settings;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
@ -32,7 +30,7 @@ public class GenericEventManagerTest {
.setType("generic") .setType("generic")
.setListener(e -> logger.log(Level.INFO, "received event " + e)) .setListener(e -> logger.log(Level.INFO, "received event " + e))
.build(); .build();
eventManager.getGenericEventManagerService().post(event); eventManager.publish(event);
// we must wait for a certain time because we do not use a future // we must wait for a certain time because we do not use a future
Thread.sleep(500L); Thread.sleep(500L);
assertEquals(1, consumer.getCount()); assertEquals(1, consumer.getCount());
@ -52,7 +50,7 @@ public class GenericEventManagerTest {
future.complete(e); future.complete(e);
}) })
.build(); .build();
eventManager.getGenericEventManagerService().post(event); eventManager.publish(event);
Event e = future.get(1000L, TimeUnit.MILLISECONDS); Event e = future.get(1000L, TimeUnit.MILLISECONDS);
assertNotNull(e); assertNotNull(e);
} }
@ -76,7 +74,7 @@ public class GenericEventManagerTest {
future.complete(e); future.complete(e);
}) })
.build(); .build();
eventManager.getGenericEventManagerService().post((GenericEventImpl) event, future); eventManager.publish((GenericEventImpl) event, future);
Event e = future.get(5L, TimeUnit.SECONDS); Event e = future.get(5L, TimeUnit.SECONDS);
if (e != null) { if (e != null) {
logger.log(Level.INFO, "the event " + e + " was received by all consumers"); logger.log(Level.INFO, "the event " + e + " was received by all consumers");

View file

@ -21,11 +21,13 @@ public class FileFollowEventManagerTest {
TestFileFollowEventConsumer consumer = new TestFileFollowEventConsumer(); TestFileFollowEventConsumer consumer = new TestFileFollowEventConsumer();
Settings settings = Settings.settingsBuilder() Settings settings = Settings.settingsBuilder()
.put("event.filefollow.testfilefollowevent.enabled", "true") .put("event.filefollow.testfilefollowevent.enabled", "true")
.put("event.filefollow.testfilefollowevent.type", "filefollow-test")
.put("event.filefollow.testfilefollowevent.base", path.toString()) .put("event.filefollow.testfilefollowevent.base", path.toString())
.put("event.filefollow.testfilefollowevent.pattern", ".*") .put("event.filefollow.testfilefollowevent.pattern", ".*")
.build(); .build();
EventManager eventManager = EventManager.builder() EventManager eventManager = EventManager.builder()
.setSettings(settings) .setSettings(settings)
.register("filefollow-test", TestFileFollowEvent.class)
.register(consumer) .register(consumer)
.build(); .build();
Thread.sleep(1000L); Thread.sleep(1000L);
@ -37,6 +39,6 @@ public class FileFollowEventManagerTest {
Thread.sleep(1000L); Thread.sleep(1000L);
Files.delete(testTxt); Files.delete(testTxt);
Files.delete(path); Files.delete(path);
eventManager.close(); eventManager.shutdown();
} }
} }

View file

@ -1,7 +1,11 @@
package org.xbib.event.path; package org.xbib.event.path;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.xbib.datastructures.api.TimeValue;
import org.xbib.event.EventConsumer;
import org.xbib.event.PathEvent; import org.xbib.event.PathEvent;
import org.xbib.event.bus.AllowConcurrentEvents;
import org.xbib.event.bus.Subscribe;
import org.xbib.event.common.EventManager; import org.xbib.event.common.EventManager;
import org.xbib.event.common.PathEventImpl; import org.xbib.event.common.PathEventImpl;
import org.xbib.settings.Settings; import org.xbib.settings.Settings;
@ -10,6 +14,7 @@ import java.io.BufferedWriter;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Map;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -31,10 +36,11 @@ public class PathEventManagerTest {
.build(); .build();
Path testTxt = path.resolve("incoming").resolve("test.txt"); Path testTxt = path.resolve("incoming").resolve("test.txt");
try (BufferedWriter bufferedWriter = Files.newBufferedWriter(testTxt)) { try (BufferedWriter bufferedWriter = Files.newBufferedWriter(testTxt)) {
bufferedWriter.write("Hello"); bufferedWriter.write("{\"Hello\":\"world\"}");
} }
Thread.sleep(2000L); Thread.sleep(1000L);
eventManager.close(); eventManager.shutdown();
// extra destroy to clean up test
eventManager.getPathEventManagerService().destroy(); eventManager.getPathEventManagerService().destroy();
} }
@ -44,8 +50,8 @@ public class PathEventManagerTest {
TestPathEventConsumer consumer = new TestPathEventConsumer(); TestPathEventConsumer consumer = new TestPathEventConsumer();
Settings settings = Settings.settingsBuilder() Settings settings = Settings.settingsBuilder()
.put("event.path.testpathevent.enabled", "true") .put("event.path.testpathevent.enabled", "true")
.put("event.path.testpathevent.path", path.toString())
.put("event.path.testpathevent.type", "path-ext") .put("event.path.testpathevent.type", "path-ext")
.put("event.path.testpathevent.path", path.toString())
.build(); .build();
EventManager eventManager = EventManager.builder() EventManager eventManager = EventManager.builder()
.setSettings(settings) .setSettings(settings)
@ -54,10 +60,32 @@ public class PathEventManagerTest {
.build(); .build();
Path testTxt = path.resolve("incoming").resolve("test.txt"); Path testTxt = path.resolve("incoming").resolve("test.txt");
try (BufferedWriter bufferedWriter = Files.newBufferedWriter(testTxt)) { try (BufferedWriter bufferedWriter = Files.newBufferedWriter(testTxt)) {
bufferedWriter.write("Hello"); bufferedWriter.write("{\"Hello\":\"world\"}");
} }
Thread.sleep(2000L); Thread.sleep(1000L);
eventManager.close(); eventManager.shutdown();
// extra destroy to clean up test
eventManager.getPathEventManagerService().destroy();
}
@Test
public void testPathEventByApi() throws Exception {
TestPathExtEventConsumer consumer = new TestPathExtEventConsumer();
EventManager eventManager = EventManager.builder()
.register("path-ext", PathExtEvent.class)
.register(consumer)
.build();
// create by API
Path path = Files.createTempDirectory("testpath");
eventManager.getPathEventManagerService().add("test", path, "path-ext", TimeValue.timeValueHours(72));
// publish
PathEventService pathEventService = eventManager.getPathEventManagerService().getPathEventService("test");
eventManager.getPathEventManagerService()
.publishJson(pathEventService, "key", Map.of("hello", "world"));
// everything done
Thread.sleep(1000L);
eventManager.shutdown();
// extra destroy to clean up test
eventManager.getPathEventManagerService().destroy(); eventManager.getPathEventManagerService().destroy();
} }
@ -68,4 +96,25 @@ public class PathEventManagerTest {
logger.log(Level.INFO, "I'm the path ext event"); logger.log(Level.INFO, "I'm the path ext event");
} }
} }
public static class TestPathEventConsumer implements EventConsumer {
@Subscribe
@AllowConcurrentEvents
void onEvent(PathEvent event) {
logger.log(Level.INFO, "received path event, path = " + event.getPath() +
" payload = " + event.getPayload());
}
}
public static class TestPathExtEventConsumer implements EventConsumer {
@Subscribe
@AllowConcurrentEvents
void onEvent(PathExtEvent event) {
logger.log(Level.INFO, "received path ext event, path = " + event.getPath() +
" payload = " + event.getPayload());
}
}
} }

View file

@ -0,0 +1,10 @@
package org.xbib.event.path;
import org.xbib.event.common.EventManager;
import org.xbib.event.common.FileFollowEventImpl;
public class TestFileFollowEvent extends FileFollowEventImpl {
public TestFileFollowEvent(EventManager.EventBuilder builder) {
super(builder);
}
}

View file

@ -1,7 +1,6 @@
package org.xbib.event.path; package org.xbib.event.path;
import org.xbib.event.EventConsumer; import org.xbib.event.EventConsumer;
import org.xbib.event.FileFollowEvent;
import org.xbib.event.bus.AllowConcurrentEvents; import org.xbib.event.bus.AllowConcurrentEvents;
import org.xbib.event.bus.Subscribe; import org.xbib.event.bus.Subscribe;
@ -14,7 +13,7 @@ public class TestFileFollowEventConsumer implements EventConsumer {
@Subscribe @Subscribe
@AllowConcurrentEvents @AllowConcurrentEvents
void onEvent(FileFollowEvent event) { void onEvent(TestFileFollowEvent event) {
logger.log(Level.INFO, "received filefollow event, path = " + event.getPath() + " content = " + event.getMessage()); logger.log(Level.INFO, "received test filefollow event, path = " + event.getPath() + " content = " + event.getMessage());
} }
} }

View file

@ -16,7 +16,7 @@ public class TestTimerEventConsumer implements EventConsumer {
@Subscribe @Subscribe
@AllowConcurrentEvents @AllowConcurrentEvents
void onEvent(TimerEvent event) { void onEvent(TimerEvent event) {
logger.log(Level.INFO, "received timer event on " + Instant.now() + " event instant = " + event.getCreated()); logger.log(Level.INFO, "received timer event on " + Instant.now() + " event instant = " + event.getInstant());
} }
} }

View file

@ -1,7 +1,7 @@
package org.xbib.event.timer; package org.xbib.event.timer;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.xbib.event.Payload; import org.xbib.event.common.PayloadImpl;
import org.xbib.event.common.EventManager; import org.xbib.event.common.EventManager;
import org.xbib.settings.Settings; import org.xbib.settings.Settings;
@ -22,8 +22,8 @@ public class TimerEventManagerTest {
.register(consumer) .register(consumer)
.build(); .build();
TimerEventManagerService timerEventManager = eventManager.getTimerEventManagerService(); TimerEventManagerService timerEventManager = eventManager.getTimerEventManagerService();
Payload payload = new Payload(Map.of("a", "b")); PayloadImpl payload = new PayloadImpl(Map.of("a", "b"));
timerEventManager.publish("testtimerevent", Instant.now().plusSeconds(5L), payload); timerEventManager.schedule("testtimerevent", Instant.now().plusSeconds(5L), payload);
Thread.sleep(10000L); Thread.sleep(10000L);
timerEventManager.shutdown(); timerEventManager.shutdown();
} }

View file

@ -0,0 +1,3 @@
dependencies {
implementation libs.jna
}

View file

@ -0,0 +1,421 @@
package org.xbib.event.journald;
public class DefaultJournalEntry implements JournalEntry {
private String message;
private String messageId;
private int priority;
private String codeFile;
private String codeLine;
private String codeFunc;
private String errno;
private String syslogFacility;
private String syslogIdentifier;
private String syslogPid;
private String syslogTimestamp;
private String syslogRaw;
private String pid;
private String uid;
private String gid;
private String comm;
private String exe;
private String cmdLine;
private String capEffective;
private String auditSession;
private String auditLoginUid;
private String systemdCgroup;
private String systemdSlice;
private String systemdUnit;
private String systemdUserSlice;
private String systemdUserUnit;
private String systemdSession;
private String systemdOwnerUid;
private String selinuxContext;
private String sourceRealtimeTimestamp;
private String bootId;
private String machineId;
private String systemdInvocationId;
private String hostname;
private String transport;
private String streamId;
private String lineBreak;
public void setMessage(String message) {
this.message = message;
}
@Override
public String getMessage() {
return message;
}
public void setMessageId(String messageId) {
this.messageId = messageId;
}
@Override
public String getMessageId() {
return messageId;
}
public void setPriority(int priority) {
this.priority = priority;
}
@Override
public int getPriority() {
return priority;
}
public void setCodeFile(String codeFile) {
this.codeFile = codeFile;
}
@Override
public String getCodeFile() {
return codeFile;
}
public void setCodeLine(String codeLine) {
this.codeLine = codeLine;
}
@Override
public String getCodeLine() {
return codeLine;
}
public void setCodeFunc(String codeFunc) {
this.codeFunc = codeFunc;
}
@Override
public String getCodeFunc() {
return codeFunc;
}
public void setErrno(String errno) {
this.errno = errno;
}
@Override
public String getErrno() {
return errno;
}
public void setSyslogFacility(String syslogFacility) {
this.syslogFacility = syslogFacility;
}
@Override
public String getSyslogFacility() {
return syslogFacility;
}
public void setSyslogIdentifier(String syslogIdentifier) {
this.syslogIdentifier = syslogIdentifier;
}
@Override
public String getSyslogIdentifier() {
return syslogIdentifier;
}
public void setSyslogPid(String syslogPid) {
this.syslogPid = syslogPid;
}
@Override
public String getSyslogPid() {
return syslogPid;
}
public void setSyslogTimestamp(String syslogTimestamp) {
this.syslogTimestamp = syslogTimestamp;
}
@Override
public String getSyslogTimestamp() {
return syslogTimestamp;
}
public void setSyslogRaw(String syslogRaw) {
this.syslogRaw = syslogRaw;
}
@Override
public String getSyslogRaw() {
return syslogRaw;
}
public void setPid(String pid) {
this.pid = pid;
}
@Override
public String getPid() {
return pid;
}
public void setUid(String uid) {
this.uid = uid;
}
@Override
public String getUid() {
return uid;
}
public void setGid(String gid) {
this.gid = gid;
}
@Override
public String getGid() {
return gid;
}
public void setComm(String comm) {
this.comm = comm;
}
@Override
public String getComm() {
return comm;
}
public void setExe(String exe) {
this.exe = exe;
}
@Override
public String getExe() {
return exe;
}
public void setCmdLine(String cmdline) {
this.cmdLine = cmdline;
}
@Override
public String getCmdLine() {
return cmdLine;
}
public void setCapEffective(String capEffective) {
this.capEffective = capEffective;
}
@Override
public String getCapEffective() {
return capEffective;
}
public void setAuditSession(String auditSession) {
this.auditSession = auditSession;
}
@Override
public String getAuditSession() {
return auditSession;
}
public void setAuditLoginUid(String auditLoginUid) {
this.auditLoginUid = auditLoginUid;
}
@Override
public String getAuditLoginUid() {
return auditLoginUid;
}
public void setSystemdCgroup(String systemdCgroup) {
this.systemdCgroup = systemdCgroup;
}
@Override
public String getSystemdCgroup() {
return systemdCgroup;
}
public void setSystemdSlice(String systemdSlice) {
this.systemdSlice = systemdSlice;
}
@Override
public String getSystemdSlice() {
return systemdSlice;
}
public void setSystemdUnit(String systemdUnit) {
this.systemdUnit = systemdUnit;
}
@Override
public String getSystemdUnit() {
return systemdUnit;
}
public void setSystemdUserSlice(String systemdUserSlice) {
this.systemdUserSlice = systemdUserSlice;
}
@Override
public String getSystemdUserSlice() {
return systemdUserSlice;
}
public void setSystemdUserUnit(String systemdUserUnit) {
this.systemdUserUnit = systemdUserUnit;
}
@Override
public String getSystemdUserUnit() {
return systemdUserUnit;
}
public void setSystemdSession(String systemdSession) {
this.systemdSession = systemdSession;
}
@Override
public String getSystemdSession() {
return systemdSession;
}
public void setSystemdOwnerUid(String systemdOwnerUid) {
this.systemdOwnerUid = systemdOwnerUid;
}
@Override
public String getSystemdOwnerUid() {
return systemdOwnerUid;
}
public void setSelinuxContext(String selinuxContext) {
this.selinuxContext = selinuxContext;
}
@Override
public String getSelinuxContext() {
return selinuxContext;
}
public void setSourceRealtimeTimestamp(String sourceRealtimeTimestamp) {
this.sourceRealtimeTimestamp = sourceRealtimeTimestamp;
}
@Override
public String getSourceRealtimeTimestamp() {
return sourceRealtimeTimestamp;
}
public void setBootId(String bootId) {
this.bootId = bootId;
}
@Override
public String getBootId() {
return bootId;
}
public void setMachineId(String machineId) {
this.machineId = machineId;
}
@Override
public String getMachineId() {
return machineId;
}
public void setSystemdInvocationId(String systemdInvocationId) {
this.systemdInvocationId = systemdInvocationId;
}
@Override
public String getSystemdInvocationId() {
return systemdInvocationId;
}
public void setHostname(String hostname) {
this.hostname = hostname;
}
@Override
public String getHostname() {
return hostname;
}
public void setTransport(String transport) {
this.transport = transport;
}
@Override
public String getTransport() {
return transport;
}
public void setStreamId(String streamId) {
this.streamId = streamId;
}
@Override
public String getStreamId() {
return streamId;
}
public void setLineBreak(String lineBreak) {
this.lineBreak = lineBreak;
}
@Override
public String getLineBreak() {
return lineBreak;
}
@Override
public String getFieldValue(String fieldName) {
return null;
}
@Override
public String toString() {
return "priority=" + this.priority + ",message=" + this.message;
}
}

View file

@ -0,0 +1,80 @@
package org.xbib.event.journald;
public interface JournalEntry {
String getMessage();
String getMessageId();
int getPriority();
String getCodeFile();
String getCodeLine();
String getCodeFunc();
String getErrno();
String getSyslogFacility();
String getSyslogIdentifier();
String getSyslogPid();
String getSyslogTimestamp();
String getSyslogRaw();
String getPid();
String getUid();
String getGid();
String getComm();
String getExe();
String getCmdLine();
String getCapEffective();
String getAuditSession();
String getAuditLoginUid();
String getSystemdCgroup();
String getSystemdSlice();
String getSystemdUnit();
String getSystemdUserSlice();
String getSystemdUserUnit();
String getSystemdSession();
String getSystemdOwnerUid();
String getSelinuxContext();
String getSourceRealtimeTimestamp();
String getBootId();
String getMachineId();
String getSystemdInvocationId();
String getHostname();
String getTransport();
String getStreamId();
String getLineBreak();
String getFieldValue(String fieldName);
}

View file

@ -0,0 +1,240 @@
package org.xbib.event.journald;
import com.sun.jna.Memory;
import com.sun.jna.Native;
import com.sun.jna.Pointer;
import com.sun.jna.StringArray;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import static org.xbib.event.journald.SystemdLibraryInstance.SD_JOURNAL_LOCAL_ONLY;
import static org.xbib.event.journald.SystemdLibraryInstance.SD_JOURNAL_NOP;
import static org.xbib.event.journald.SystemdLibraryInstance.SD_JOURNAL_SYSTEM;
public class SystemdJournalConsumer implements Runnable {
private static final Logger logger = Logger.getLogger(SystemdJournalConsumer.class.getName());
private final String match;
private final String field;
private final SystemdJournalListener listener;
public SystemdJournalConsumer(String match, SystemdJournalListener listener) {
this(match, null, listener);
}
public SystemdJournalConsumer(String match, String field, SystemdJournalListener listener) {
this.match = match;
this.field = field;
this.listener = listener;
}
@Override
public void run() {
try {
loop();
} catch (Throwable e) {
logger.log(Level.SEVERE, e.getMessage(), e);
}
}
/**
* <a href="https://gist.github.com/portante/ff7fb429c6f973aab377f7bb77b0ffdb">...</a>
* @throws IOException if loop fails
*/
private void loop() throws IOException {
SystemdLibraryInstance api = SystemdLibraryInstance.getInstance();
SystemdLibrary.SdJournal sdJournal = new SystemdLibrary.SdJournal();
int rc = api.sd_journal_open(sdJournal, SD_JOURNAL_LOCAL_ONLY);
if (rc < 0) {
throw new IOException("error opening journal for read: " + rc);
}
logger.log(Level.INFO, "top fd = " + sdJournal.top_level_fd +
" path = " + sdJournal.path +
" prefix = " + sdJournal.prefix +
" namespace = " + sdJournal.namespace);
if (match != null) {
rc = api.sd_journal_add_match(sdJournal, match, match.length());
logger.log(Level.INFO, "add_match: " + rc);
if (rc < 0) {
throw new IOException("error in add_match: " + rc);
}
}
rc = api.sd_journal_get_fd(sdJournal);
logger.log(Level.INFO, "get_fd: " + rc);
if (rc < 0) {
throw new IOException("failed to get file descriptor: " + rc);
}
rc = api.sd_journal_seek_tail(sdJournal);
logger.log(Level.INFO, "seek_tail: " + rc);
if (rc < 0) {
throw new IOException("failed to seek to tail of journal");
}
rc = api.sd_journal_previous(sdJournal);
logger.log(Level.INFO, "previous: " + rc);
rc = api.sd_journal_next(sdJournal);
logger.log(Level.INFO, "next: " + rc);
String[] strings = new String[1];
StringArray cursor = new StringArray(strings);
rc = api.sd_journal_get_cursor(sdJournal, cursor);
logger.log(Level.INFO, "get_cursor: " + rc);
System.exit(1);
while (true) {
do {
rc = api.sd_journal_wait(sdJournal, -1L);
logger.log(Level.INFO, "wait rc = " + rc);
} while (rc == SD_JOURNAL_NOP);
while ((rc = api.sd_journal_next(sdJournal)) > 0) {
logger.log(Level.INFO, "next: " + rc);
if (field != null) {
Pointer dataPointer = new Memory(Native.POINTER_SIZE);
Pointer sizePointer = new Memory(Native.POINTER_SIZE);
rc = api.sd_journal_get_data(sdJournal, field, dataPointer, sizePointer);
logger.log(Level.INFO, "get_data: " + rc);
if (rc != 0) {
throw new IOException("error in get_data: " + rc);
}
int size = sizePointer.getInt(0);
byte[] b = dataPointer.getByteArray(0, size);
String s = new String(b, StandardCharsets.UTF_8);
if (listener != null) {
listener.handleEntry(makeEntry(Collections.singletonList(s)));
}
} else {
String[] strings2 = new String[1];
StringArray nextCursor = new StringArray(strings2);
rc = api.sd_journal_get_cursor(sdJournal, nextCursor);
logger.log(Level.INFO, "get_cursor: " + rc);
if (!cursor.getString(0).equals(nextCursor.getString(0))) {
cursor = nextCursor;
Pointer dataPointer = new Memory(Native.POINTER_SIZE);
Pointer sizePointer = new Memory(Native.POINTER_SIZE);
List<String> list = new ArrayList<>();
while ((rc = api.sd_journal_enumerate_data(sdJournal, dataPointer, sizePointer)) > 0) {
logger.log(Level.INFO, "enumerate_data: " + rc);
int size = sizePointer.getInt(0);
byte[] b = dataPointer.getByteArray(0, size);
String s = new String(b, StandardCharsets.UTF_8);
list.add(s);
}
rc = api.sd_journal_restart_data(sdJournal);
logger.log(Level.INFO, "restart_data: " + rc);
if (listener != null) {
listener.handleEntry(makeEntry(list));
}
}
}
}
}
}
private JournalEntry makeEntry(List<String> list) {
DefaultJournalEntry journalEntry = new DefaultJournalEntry();
for (String string : list) {
if (string.startsWith("MESSAGE=")) {
journalEntry.setMessage(string.substring(8));
continue;
}
if (string.startsWith("MESSAGE_ID=")) {
journalEntry.setMessageId(string.substring(11));
continue;
}
if (string.startsWith("PRIORITY=")) {
journalEntry.setPriority(Integer.parseInt(string.substring(9)));
continue;
}
if (string.startsWith("CODE_FILE=")) {
journalEntry.setCodeFile(string.substring(10));
continue;
}
if (string.startsWith("CODE_LINE=")) {
journalEntry.setCodeLine(string.substring(10));
continue;
}
if (string.startsWith("CODE_FUNC=")) {
journalEntry.setCodeFunc(string.substring(10));
continue;
}
if (string.startsWith("ERRNO=")) {
journalEntry.setErrno(string.substring(6));
continue;
}
if (string.startsWith("SYSLOG_FACILITY=")) {
journalEntry.setSyslogFacility(string.substring(16));
continue;
}
if (string.startsWith("SYSLOG_IDENTIFIER=")) {
journalEntry.setSyslogIdentifier(string.substring(18));
continue;
}
if (string.startsWith("SYSLOG_PID=")) {
journalEntry.setSyslogPid(string.substring(11));
continue;
}
if (string.startsWith("SYSLOG_TIMESTAMP=")) {
journalEntry.setSyslogTimestamp(string.substring(17));
continue;
}
if (string.startsWith("SYSLOG_RAW=")) {
journalEntry.setSyslogRaw(string.substring(11));
continue;
}
if (string.startsWith("_PID=")) {
journalEntry.setPid(string.substring(5));
continue;
}
if (string.startsWith("_UID=")) {
journalEntry.setUid(string.substring(5));
continue;
}
if (string.startsWith("_GID=")) {
journalEntry.setGid(string.substring(5));
continue;
}
if (string.startsWith("_COMM=")) {
journalEntry.setComm(string.substring(6));
continue;
}
if (string.startsWith("_EXE=")) {
journalEntry.setExe(string.substring(5));
continue;
}
if (string.startsWith("_CMDLINE=")) {
journalEntry.setCmdLine(string.substring(9));
continue;
}
if (string.startsWith("_CAP_EFFECTIVE=")) {
journalEntry.setCapEffective(string.substring(15));
continue;
}
if (string.startsWith("_TRANSPORT=")) {
journalEntry.setTransport(string.substring(11));
continue;
}
if (string.startsWith("_SYSTEMD_OWNER_UID=")) {
journalEntry.setSystemdOwnerUid(string.substring(19));
continue;
}
if (string.startsWith("_SYSTEMD_UNIT=")) {
journalEntry.setSystemdUnit(string.substring(13));
continue;
}
if (string.startsWith("_SYSTEMD_USER_SLICE=")) {
journalEntry.setSystemdUserSlice(string.substring(19));
continue;
}
if (string.startsWith("_SYSTEMD_USER_UNIT=")) {
journalEntry.setSystemdUserUnit(string.substring(18));
}
}
return journalEntry;
}
}

View file

@ -0,0 +1,8 @@
package org.xbib.event.journald;
import java.io.IOException;
public interface SystemdJournalListener {
void handleEntry(JournalEntry entry) throws IOException;
}

View file

@ -0,0 +1,49 @@
package org.xbib.event.journald;
import com.sun.jna.Library;
import com.sun.jna.Pointer;
import com.sun.jna.StringArray;
import com.sun.jna.Structure;
import java.util.Arrays;
import java.util.List;
public interface SystemdLibrary extends Library {
int sd_notify(int fd, String msg);
int sd_journal_send(String format, Object... args);
int sd_journal_open(SdJournal sdJournal, int flag);
int sd_journal_get_fd(SdJournal sdJournal);
int sd_journal_seek_tail(SdJournal sdJournal);
int sd_journal_previous(SdJournal sdJournal);
int sd_journal_next(SdJournal sdJournal);
int sd_journal_get_cursor(SdJournal sdJournal, StringArray cursor);
int sd_journal_add_match(SdJournal sdJournal, String match, int len);
int sd_journal_wait(SdJournal sdJournal, long timeout);
int sd_journal_get_data(SdJournal sdJournal, String field, Pointer data, Pointer length);
int sd_journal_enumerate_data(SdJournal sdJournal, Pointer data, Pointer length);
int sd_journal_restart_data(SdJournal sdJournal);
class SdJournal extends Structure {
public int top_level_fd;
public String path;
public String prefix;
public String namespace;
@Override
protected List<String> getFieldOrder() {
return Arrays.asList("top_level_fd","path","prefix","namespace");
}
}
}

View file

@ -0,0 +1,105 @@
package org.xbib.event.journald;
import com.sun.jna.Native;
import com.sun.jna.Pointer;
import com.sun.jna.StringArray;
import java.util.List;
/**
* The systemd library API, loaded by Java Native Access (JNA).
*
* The native library is loaded only once, so this class is a singleton.
*/
public class SystemdLibraryInstance {
private static final SystemdLibraryInstance instance = new SystemdLibraryInstance();
private final SystemdLibrary systemdLibrary;
private SystemdLibraryInstance() {
this.systemdLibrary = loadLibrary();
}
public static SystemdLibraryInstance getInstance() {
return instance;
}
public static final int SD_JOURNAL_LOCAL_ONLY = 1;
public static final int SD_JOURNAL_RUNTIME_ONLY = 2;
public static final int SD_JOURNAL_SYSTEM = 4;
public static final int SD_JOURNAL_CURRENT_USER = 8;
public static final int SD_JOURNAL_NOP = 0;
public static final int SD_JOURNAL_APPEND = 1;
public static final int SD_JOURNAL_INVALIDATE = 2;
public static final String SD_ID128_FORMAT_STR = "%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x%02x";
public int sd_journal_send(String format, Object... args) {
return systemdLibrary.sd_journal_send(format, args);
}
public int sd_journal_send(String format, List<Object> args) {
return systemdLibrary.sd_journal_send(format, args.toArray());
}
public int sd_journal_open(SystemdLibrary.SdJournal sdJournal, int flag) {
return systemdLibrary.sd_journal_open(sdJournal, flag);
}
public int sd_journal_get_fd(SystemdLibrary.SdJournal sdJournal) {
return systemdLibrary.sd_journal_get_fd(sdJournal);
}
public int sd_journal_seek_tail(SystemdLibrary.SdJournal sdJournal) {
return systemdLibrary.sd_journal_seek_tail(sdJournal);
}
public int sd_journal_previous(SystemdLibrary.SdJournal sdJournal) {
return systemdLibrary.sd_journal_previous(sdJournal);
}
public int sd_journal_next(SystemdLibrary.SdJournal sdJournal) {
return systemdLibrary.sd_journal_next(sdJournal);
}
public int sd_journal_get_cursor(SystemdLibrary.SdJournal sdJournal, StringArray cursor) {
return systemdLibrary.sd_journal_get_cursor(sdJournal, cursor);
}
public int sd_journal_add_match(SystemdLibrary.SdJournal sdJournal, String match, int len) {
return systemdLibrary.sd_journal_add_match(sdJournal, match, len);
}
public int sd_journal_wait(SystemdLibrary.SdJournal sdJournal, long timeout_musec) {
return systemdLibrary.sd_journal_wait(sdJournal, timeout_musec);
}
public int sd_journal_get_data(SystemdLibrary.SdJournal sdJournal, String field, Pointer data, Pointer length) {
return systemdLibrary.sd_journal_get_data(sdJournal, field, data, length);
}
public int sd_journal_enumerate_data(SystemdLibrary.SdJournal sdJournal, Pointer data, Pointer length) {
return systemdLibrary.sd_journal_enumerate_data(sdJournal, data, length);
}
public int sd_journal_restart_data(SystemdLibrary.SdJournal sdJournal) {
return systemdLibrary.sd_journal_restart_data(sdJournal);
}
private static SystemdLibrary loadLibrary() {
try {
return Native.load("systemd", SystemdLibrary.class);
} catch (UnsatisfiedLinkError e) {
throw new RuntimeException("Failed to load systemd library." +
" Please note that JNA requires an executable temporary folder." +
" It can be explicitly defined with -Djna.tmpdir", e);
}
}
}

View file

@ -0,0 +1,26 @@
package org.xbib.event.journald.test;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledOnOs;
import org.junit.jupiter.api.condition.OS;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.xbib.event.journald.SystemdJournalConsumer;
@EnabledOnOs({OS.LINUX})
public class SystemdJournalReaderTest {
private static final Logger logger = Logger.getLogger(SystemdJournalReaderTest.class.getName());
@Test
void testConsumer() throws InterruptedException {
SystemdJournalConsumer consumer = new SystemdJournalConsumer(null,
entry -> logger.log(Level.INFO, entry.toString()));
Executors.newSingleThreadExecutor().submit(consumer);
// consuming for some seconds
Thread.sleep(60000L);
}
}

View file

@ -0,0 +1,6 @@
handlers=java.util.logging.ConsoleHandler
.level=ALL
java.util.logging.SimpleFormatter.format=%1$tY-%1$tm-%1$td %1$tH:%1$tM:%1$tS.%1$tL %4$-7s [%3$s] %5$s %6$s%n
java.util.logging.ConsoleHandler.level=ALL
java.util.logging.ConsoleHandler.formatter=java.util.logging.SimpleFormatter
jdk.event.security.level=INFO

View file

@ -1,7 +1,7 @@
dependencies { dependencies {
api project(':event-common') api project(':event-common')
api libs.net.http.server.netty api libs.net.http.server.netty.secure
api libs.net.http.client.netty api libs.net.http.client.netty.secure
implementation libs.settings.datastructures.json implementation libs.settings.datastructures.json
implementation libs.datastructures.json.tiny implementation libs.datastructures.json.tiny
} }

View file

@ -24,16 +24,16 @@ public class HttpEventReceiverService {
public HttpService createService(String prefix) { public HttpService createService(String prefix) {
return BaseHttpService.builder() return BaseHttpService.builder()
.setPrefix(prefix) .setPrefix(prefix)
.setPath("/event/{type}") .setPath("/event")
.setMethod(HttpMethod.POST) .setMethod(HttpMethod.POST)
.setHandler(ctx -> { .setHandler(ctx -> {
Event event = EventManager.eventFromJson(ctx.getRequest().asJson()); Event event = EventManager.eventFromJson(ctx.getRequest().asJson());
if (event.isNullEvent()) { if (event.isNullEvent()) {
ctx.status(NOT_FOUND).done(); ctx.status(NOT_FOUND).done();
} else { } else {
eventManager.dispatch(event); eventManager.publish(event);
ctx.status(OK) ctx.status(OK)
.header(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON) .setHeader(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON)
.charset(StandardCharsets.UTF_8) .charset(StandardCharsets.UTF_8)
.body(event.toJson()) .body(event.toJson())
.done(); .done();

View file

@ -1,8 +1,12 @@
package org.xbib.event.net.http.test; package org.xbib.event.net.http.test;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.xbib.event.Event;
import org.xbib.event.Listener;
import org.xbib.event.common.EventManager; import org.xbib.event.common.EventManager;
import org.xbib.event.common.GenericEventImpl;
import org.xbib.event.net.http.HttpEventManagerService; import org.xbib.event.net.http.HttpEventManagerService;
import org.xbib.net.NetworkClass; import org.xbib.net.NetworkClass;
import org.xbib.net.URL; import org.xbib.net.URL;
@ -22,17 +26,16 @@ import org.xbib.net.http.server.netty.NettyHttpServerConfig;
import org.xbib.net.http.server.route.BaseHttpRouter; import org.xbib.net.http.server.route.BaseHttpRouter;
import org.xbib.net.http.server.route.HttpRouter; import org.xbib.net.http.server.route.HttpRouter;
import org.xbib.net.http.server.service.BaseHttpService; import org.xbib.net.http.server.service.BaseHttpService;
import org.xbib.net.util.JsonUtil;
import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
@Disabled("for now")
public class HttpEventTest { public class HttpEventTest {
private static final Logger logger = Logger.getLogger(HttpEventTest.class.getName()); private static final Logger logger = Logger.getLogger(HttpEventTest.class.getName());
@ -48,6 +51,7 @@ public class HttpEventTest {
nettyHttpServerConfig.setDebug(true); nettyHttpServerConfig.setDebug(true);
EventManager eventManager = EventManager.builder() EventManager eventManager = EventManager.builder()
.register("dummy", DummyEvent.class)
.build(); .build();
HttpEventManagerService httpEventManagerService = eventManager.getEventManagerService(HttpEventManagerService.class); HttpEventManagerService httpEventManagerService = eventManager.getEventManagerService(HttpEventManagerService.class);
@ -58,10 +62,12 @@ public class HttpEventTest {
.addService(BaseHttpService.builder() .addService(BaseHttpService.builder()
.setPath("/event") .setPath("/event")
.setHandler(ctx -> { .setHandler(ctx -> {
DummyEvent dummyEvent = EventManager.eventOf("dummy", DummyEvent.class);
dummyEvent.setListener(new DummyEventListener());
ctx.status(HttpResponseStatus.OK) ctx.status(HttpResponseStatus.OK)
.header(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.TEXT_PLAIN) .setHeader(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.TEXT_PLAIN)
.charset(StandardCharsets.UTF_8) .charset(StandardCharsets.UTF_8)
.body(ctx.getRequest().asJson()) .body(dummyEvent.toJson())
.done(); .done();
}) })
.build()) .build())
@ -93,14 +99,9 @@ public class HttpEventTest {
" status = " + resp.getStatus() + " status = " + resp.getStatus() +
" header = " + resp.getHeaders() + " header = " + resp.getHeaders() +
" body = " + body); " body = " + body);
try { DummyEvent dummyEvent = (DummyEvent) EventManager.eventFromJson(body);
Map<String, Object> map = JsonUtil.toMap(body); assertNotNull(dummyEvent);
org.xbib.net.http.server.netty.HttpRequest httpRequest = org.xbib.net.http.server.netty.HttpRequest.builder() logger.log(Level.INFO, "dummy event transported = " + dummyEvent);
.parse(map).build();
logger.log(Level.INFO, "parsed http request = " + httpRequest.asJson());
} catch (IOException e) {
throw new RuntimeException(e);
}
received.set(true); received.set(true);
}) })
.build(); .build();
@ -109,4 +110,19 @@ public class HttpEventTest {
assertTrue(received.get()); assertTrue(received.get());
} }
} }
public static class DummyEvent extends GenericEventImpl {
public DummyEvent(EventManager.EventBuilder builder, Listener listener) {
super(builder, listener);
}
}
public static class DummyEventListener implements Listener {
@Override
public void listen(Event event) {
logger.log(Level.INFO, "got event " + event);
}
}
} }

View file

@ -0,0 +1,4 @@
org.xbib.net.http.server.netty.http1.Http1ChannelInitializer
org.xbib.net.http.server.netty.http2.Http2ChannelInitializer
org.xbib.net.http.server.netty.secure.http1.Https1ChannelInitializer
org.xbib.net.http.server.netty.secure.http2.Https2ChannelInitializer

View file

@ -1,3 +1,3 @@
group = org.xbib group = org.xbib
name = event name = event
version = 0.2.0 version = 0.4.0

Binary file not shown.

View file

@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.5-all.zip distributionUrl=https\://services.gradle.org/distributions/gradle-8.7-bin.zip
networkTimeout=10000 networkTimeout=10000
validateDistributionUrl=true validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME zipStoreBase=GRADLE_USER_HOME

20
gradlew.bat vendored
View file

@ -43,11 +43,11 @@ set JAVA_EXE=java.exe
%JAVA_EXE% -version >NUL 2>&1 %JAVA_EXE% -version >NUL 2>&1
if %ERRORLEVEL% equ 0 goto execute if %ERRORLEVEL% equ 0 goto execute
echo. echo. 1>&2
echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. 1>&2
echo. echo. 1>&2
echo Please set the JAVA_HOME variable in your environment to match the echo Please set the JAVA_HOME variable in your environment to match the 1>&2
echo location of your Java installation. echo location of your Java installation. 1>&2
goto fail goto fail
@ -57,11 +57,11 @@ set JAVA_EXE=%JAVA_HOME%/bin/java.exe
if exist "%JAVA_EXE%" goto execute if exist "%JAVA_EXE%" goto execute
echo. echo. 1>&2
echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% 1>&2
echo. echo. 1>&2
echo Please set the JAVA_HOME variable in your environment to match the echo Please set the JAVA_HOME variable in your environment to match the 1>&2
echo location of your Java installation. echo location of your Java installation. 1>&2
goto fail goto fail

View file

@ -15,25 +15,24 @@ pluginManagement {
dependencyResolutionManagement { dependencyResolutionManagement {
versionCatalogs { versionCatalogs {
libs { libs {
version('gradle', '8.5') version('gradle', '8.7')
version('datastructures', '5.0.6') version('datastructures', '5.0.7')
version('net', '4.0.4') version('net-http', '4.5.0')
version('net-http', '4.1.0') version('netty', '4.1.109.Final')
version('netty', '4.1.104.Final')
library('netty-handler', 'io.netty', 'netty-handler').versionRef('netty') library('netty-handler', 'io.netty', 'netty-handler').versionRef('netty')
library('net', 'org.xbib', 'net').versionRef('net')
library('datastructures-common', 'org.xbib', 'datastructures-common').versionRef('datastructures') library('datastructures-common', 'org.xbib', 'datastructures-common').versionRef('datastructures')
library('datastructures-json-tiny', 'org.xbib', 'datastructures-json-tiny').versionRef('datastructures') library('datastructures-json-tiny', 'org.xbib', 'datastructures-json-tiny').versionRef('datastructures')
library('settings-api', 'org.xbib', 'settings-api').versionRef('datastructures') library('settings-api', 'org.xbib', 'settings-api').versionRef('datastructures')
library('settings-datastructures-json', 'org.xbib', 'settings-datastructures-json').versionRef('datastructures') library('settings-datastructures-json', 'org.xbib', 'settings-datastructures-json').versionRef('datastructures')
library('time', 'org.xbib', 'time').version('4.0.0') library('time', 'org.xbib', 'time').version('4.0.0')
library('net-http-server-netty', 'org.xbib', 'net-http-server-netty-secure').versionRef('net-http') library('net-http-server-netty-secure', 'org.xbib', 'net-http-server-netty-secure').versionRef('net-http')
library('net-http-client-netty', 'org.xbib', 'net-http-client-netty-secure').versionRef('net-http') library('net-http-client-netty-secure', 'org.xbib', 'net-http-client-netty-secure').versionRef('net-http')
library('reactivestreams', 'org.reactivestreams', 'reactive-streams').version('1.0.3') library('reactivestreams', 'org.reactivestreams', 'reactive-streams').version('1.0.3')
library('rxjava3', 'io.reactivex.rxjava3', 'rxjava').version('3.0.3') library('rxjava3', 'io.reactivex.rxjava3', 'rxjava').version('3.0.3')
library('jna', 'net.java.dev.jna', 'jna').version('5.14.0')
} }
testLibs { testLibs {
version('junit', '5.10.1') version('junit', '5.10.2')
library('junit-jupiter-api', 'org.junit.jupiter', 'junit-jupiter-api').versionRef('junit') library('junit-jupiter-api', 'org.junit.jupiter', 'junit-jupiter-api').versionRef('junit')
library('junit-jupiter-params', 'org.junit.jupiter', 'junit-jupiter-params').versionRef('junit') library('junit-jupiter-params', 'org.junit.jupiter', 'junit-jupiter-params').versionRef('junit')
library('junit-jupiter-engine', 'org.junit.jupiter', 'junit-jupiter-engine').versionRef('junit') library('junit-jupiter-engine', 'org.junit.jupiter', 'junit-jupiter-engine').versionRef('junit')
@ -46,6 +45,7 @@ dependencyResolutionManagement {
include 'event-api' include 'event-api'
include 'event-async' include 'event-async'
include 'event-common' include 'event-common'
include 'event-journald'
include 'event-loop' include 'event-loop'
include 'event-net-http' include 'event-net-http'
include 'event-pubsub' include 'event-pubsub'