make path events work with payload

This commit is contained in:
Jörg Prante 2024-03-20 17:28:34 +01:00
parent 17f919393f
commit 0b78c32b91
21 changed files with 346 additions and 161 deletions

View file

@ -13,7 +13,7 @@ ext {
user = 'joerg' user = 'joerg'
name = 'event' name = 'event'
description = 'Event framework for Java (NIO paths, files, timers, journals)' description = 'Event framework for Java (NIO paths, files, timers, journals)'
inceptionYear = '2021' inceptionYear = '2024'
url = 'https://xbib.org/' + user + '/' + name url = 'https://xbib.org/' + user + '/' + name
scmUrl = 'https://xbib.org/' + user + '/' + name scmUrl = 'https://xbib.org/' + user + '/' + name
scmConnection = 'scm:git:git://xbib.org/' + user + '/' + name + '.git' scmConnection = 'scm:git:git://xbib.org/' + user + '/' + name + '.git'
@ -28,20 +28,7 @@ subprojects {
apply from: rootProject.file('gradle/compile/java.gradle') apply from: rootProject.file('gradle/compile/java.gradle')
apply from: rootProject.file('gradle/test/junit5.gradle') apply from: rootProject.file('gradle/test/junit5.gradle')
apply from: rootProject.file('gradle/repositories/maven.gradle') apply from: rootProject.file('gradle/repositories/maven.gradle')
apply from: rootProject.file('gradle/publish/maven.gradle')
} }
apply from: rootProject.file('gradle/publish/sonatype.gradle') apply from: rootProject.file('gradle/publish/sonatype.gradle')
apply from: rootProject.file('gradle/publish/forgejo.gradle') apply from: rootProject.file('gradle/publish/forgejo.gradle')
/*
dependencies {
api libs.settings.api
implementation libs.net
implementation libs.time
implementation libs.datastructures.common
implementation libs.datastructures.json.tiny
implementation libs.netty.handler
implementation libs.reactivestreams
testImplementation libs.rxjava3
testImplementation libs.settings.datastructures.json
}
*/

View file

@ -0,0 +1,8 @@
package org.xbib.event;
public interface GenericEvent extends Event {
GenericEvent setListener(Listener listener);
void received();
}

View file

@ -1,16 +1,6 @@
package org.xbib.event; package org.xbib.event;
import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
@SuppressWarnings("serial") public interface Payload extends Map<String, Object> {
public class Payload extends LinkedHashMap<String, Object> {
public Payload() {
super();
}
public Payload(Map<String, Object> map) {
super(map);
}
} }

View file

@ -1,7 +1,6 @@
dependencies { dependencies {
api project(':event-api') api project(':event-api')
api libs.settings.api api libs.settings.api
implementation libs.net
implementation libs.time implementation libs.time
implementation libs.settings.datastructures.json implementation libs.settings.datastructures.json
implementation libs.datastructures.common implementation libs.datastructures.common

View file

@ -1,6 +1,16 @@
import org.xbib.event.common.EventManagerService; import org.xbib.event.common.EventManagerService;
import org.xbib.event.EventConsumer;
import org.xbib.event.Event;
module org.xbib.event.common { module org.xbib.event.common {
requires java.logging;
requires org.xbib.event.api;
requires org.xbib.settings.api;
requires org.xbib.settings.datastructures.json;
requires org.xbib.time;
requires org.xbib.datastructures.common;
requires org.xbib.datastructures.tiny;
requires org.xbib.datastructures.json.tiny;
exports org.xbib.event.bus; exports org.xbib.event.bus;
exports org.xbib.event.clock; exports org.xbib.event.clock;
exports org.xbib.event.common; exports org.xbib.event.common;
@ -8,16 +18,9 @@ module org.xbib.event.common {
exports org.xbib.event.path; exports org.xbib.event.path;
exports org.xbib.event.persistence; exports org.xbib.event.persistence;
exports org.xbib.event.timer; exports org.xbib.event.timer;
exports org.xbib.event.util;
exports org.xbib.event.wal; exports org.xbib.event.wal;
uses Event;
uses EventConsumer;
uses EventManagerService; uses EventManagerService;
uses org.xbib.event.EventConsumer;
uses org.xbib.event.Event;
requires org.xbib.event.api;
requires org.xbib.settings.api;
requires org.xbib.settings.datastructures.json;
requires org.xbib.net;
requires org.xbib.time;
requires org.xbib.datastructures.common;
requires org.xbib.datastructures.json.tiny;
requires java.logging;
} }

View file

@ -16,7 +16,6 @@ import org.xbib.datastructures.json.tiny.JsonBuilder;
import org.xbib.event.Event; import org.xbib.event.Event;
import org.xbib.event.EventConsumer; import org.xbib.event.EventConsumer;
import org.xbib.event.Listener; import org.xbib.event.Listener;
import org.xbib.event.Payload;
import org.xbib.event.bus.AsyncEventBus; import org.xbib.event.bus.AsyncEventBus;
import org.xbib.event.bus.EventBus; import org.xbib.event.bus.EventBus;
import org.xbib.event.bus.Subscriber; import org.xbib.event.bus.Subscriber;
@ -76,6 +75,70 @@ public final class EventManager extends AbstractEventManagerService implements E
logger.log(Level.INFO, "installed event service managers = " + eventManagerServices.keySet()); logger.log(Level.INFO, "installed event service managers = " + eventManagerServices.keySet());
} }
@SuppressWarnings("unchecked")
public static <E extends Event> E eventOf(String eventType, Class<E> eventClass) {
return (E) eventBuilder()
.setType(eventType)
.build();
}
public static Event eventOf(String eventType,
String code,
String message,
Path path) {
return eventBuilder()
.setType(eventType)
.setCode(code)
.setMessage(message)
.setPath(path)
.setPayload(PayloadImpl.fromPath(path))
.build();
}
public static Event eventOf(String eventType,
Instant scheduled) {
return eventBuilder()
.setType(eventType)
.setScheduledFor(scheduled)
.build();
}
public static Event eventFromFile(Path file) throws IOException {
return eventFromJson(Files.readString(file));
}
@SuppressWarnings("unchecked")
public static Event eventFromJson(String json) {
Map<String, Object> map = Json.toMap(json);
EventBuilder builder = eventBuilder();
if (map.containsKey("type")) {
builder.setType(map.getOrDefault("type", "generic").toString());
}
if (map.containsKey("code")) {
builder.setCode(map.getOrDefault("code", "").toString());
}
if (map.containsKey("message")) {
builder.setMessage(map.getOrDefault("message", "").toString());
}
if (map.containsKey("created")) {
String created = map.getOrDefault("created", "").toString();
builder.setCreated(Instant.parse(created));
}
if (map.containsKey("scheduled")) {
String scheduled = map.getOrDefault("scheduled", "").toString();
builder.setCreated(Instant.parse(scheduled));
}
if (map.containsKey("payload")) {
PayloadImpl payload = new PayloadImpl((Map<String, Object>) map.get("payload"));
builder.setPayload(payload);
}
if (map.containsKey("path")) {
Path path = Paths.get((String) map.get("path"));
builder.setPath(path);
}
return builder.build();
}
public EventManagerService init(EventManager eventManager) { public EventManagerService init(EventManager eventManager) {
return this; return this;
} }
@ -287,62 +350,6 @@ public final class EventManager extends AbstractEventManagerService implements E
} }
} }
public static Event eventOf(String eventType,
String code,
String message,
Path path) {
return eventBuilder()
.setType(eventType)
.setCode(code)
.setMessage(message)
.setPath(path)
.build();
}
public static Event eventOf(String eventType,
Instant scheduled) {
return eventBuilder()
.setType(eventType)
.setScheduledFor(scheduled)
.build();
}
public static Event eventFromFile(Path file) throws IOException {
return eventFromJson(Files.readString(file));
}
@SuppressWarnings("unchecked")
public static Event eventFromJson(String json) {
Map<String, Object> map = Json.toMap(json);
EventBuilder builder = eventBuilder();
if (map.containsKey("type")) {
builder.setType(map.getOrDefault("type", "generic").toString());
}
if (map.containsKey("code")) {
builder.setCode(map.getOrDefault("code", "").toString());
}
if (map.containsKey("message")) {
builder.setMessage(map.getOrDefault("message", "").toString());
}
if (map.containsKey("created")) {
String created = map.getOrDefault("created", "").toString();
builder.setCreated(Instant.parse(created));
}
if (map.containsKey("scheduled")) {
String scheduled = map.getOrDefault("scheduled", "").toString();
builder.setCreated(Instant.parse(scheduled));
}
if (map.containsKey("payload")) {
Payload payload = new Payload((Map<String, Object>) map.get("payload"));
builder.setPayload(payload);
}
if (map.containsKey("path")) {
Path path = Paths.get((String) map.get("path"));
builder.setPath(path);
}
return builder.build();
}
public static class EventImpl implements Event { public static class EventImpl implements Event {
private final EventBuilder builder; private final EventBuilder builder;
@ -371,7 +378,7 @@ public final class EventManager extends AbstractEventManagerService implements E
} }
@Override @Override
public Payload getPayload() { public PayloadImpl getPayload() {
return builder.payload; return builder.payload;
} }
@ -412,7 +419,9 @@ public final class EventManager extends AbstractEventManagerService implements E
builder.fieldIfNotNull("type", getType()); builder.fieldIfNotNull("type", getType());
builder.fieldIfNotNull("code", getCode()); builder.fieldIfNotNull("code", getCode());
builder.fieldIfNotNull("message", getMessage()); builder.fieldIfNotNull("message", getMessage());
builder.buildKey("payload").buildMap(getPayload()); if (getPayload() != null && !getPayload().isEmpty()) {
builder.buildKey("payload").buildMap(getPayload());
}
builder.fieldIfNotNull("created", getCreated() != null ? getCreated().toString() : null); builder.fieldIfNotNull("created", getCreated() != null ? getCreated().toString() : null);
builder.fieldIfNotNull("scheduled", getScheduledFor() != null ? getScheduledFor().toString() : null); builder.fieldIfNotNull("scheduled", getScheduledFor() != null ? getScheduledFor().toString() : null);
builder.fieldIfNotNull("path", getPath() != null ? getPath().toAbsolutePath().toString() : null); builder.fieldIfNotNull("path", getPath() != null ? getPath().toAbsolutePath().toString() : null);
@ -477,7 +486,7 @@ public final class EventManager extends AbstractEventManagerService implements E
Instant scheduled; Instant scheduled;
Payload payload; PayloadImpl payload;
Path path; Path path;
@ -527,7 +536,7 @@ public final class EventManager extends AbstractEventManagerService implements E
return this; return this;
} }
public EventBuilder setPayload(Payload payload) { public EventBuilder setPayload(PayloadImpl payload) {
this.payload = payload; this.payload = payload;
return this; return this;
} }

View file

@ -1,28 +1,26 @@
package org.xbib.event.common; package org.xbib.event.common;
import org.xbib.event.GenericEvent;
import org.xbib.event.Listener; import org.xbib.event.Listener;
import java.util.Objects; import java.util.Objects;
public class GenericEventImpl extends EventManager.EventImpl { public class GenericEventImpl extends EventManager.EventImpl implements GenericEvent {
private final EventManager.EventBuilder builder; private final EventManager.EventBuilder builder;
public GenericEventImpl(EventManager.EventBuilder builder) { public GenericEventImpl(EventManager.EventBuilder builder) {
this(builder, null);
}
public GenericEventImpl(EventManager.EventBuilder builder, Listener listener) {
super(builder); super(builder);
this.builder = builder; this.builder = builder;
this.builder.listener = Objects.requireNonNull(listener);
} }
@Override
public GenericEventImpl setListener(Listener listener) { public GenericEventImpl setListener(Listener listener) {
this.builder.listener = Objects.requireNonNull(listener); this.builder.listener = Objects.requireNonNull(listener);
return this; return this;
} }
@Override
public void received() { public void received() {
builder.listener.listen(this); builder.listener.listen(this);
} }

View file

@ -0,0 +1,35 @@
package org.xbib.event.common;
import org.xbib.datastructures.json.tiny.Json;
import org.xbib.event.Payload;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.LinkedHashMap;
import java.util.Map;
@SuppressWarnings("serial")
public class PayloadImpl extends LinkedHashMap<String, Object> implements Payload {
public PayloadImpl() {
super();
}
public PayloadImpl(Map<String, Object> map) {
super(map);
}
public static PayloadImpl fromPath(Path path) {
PayloadImpl payload = new PayloadImpl();
try (InputStream inputStream = Files.newInputStream(path)) {
String content = new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
payload.putAll(Json.toMap(content));
} catch (IOException e) {
throw new IllegalArgumentException("broken json content in path " + path);
}
return payload;
}
}

View file

@ -14,6 +14,7 @@ import java.io.Writer;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.util.Collection;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -58,17 +59,35 @@ public class PathEventManagerService extends AbstractEventManagerService impleme
} else { } else {
logger.log(Level.WARNING, "path service definition not enabled in configuration"); logger.log(Level.WARNING, "path service definition not enabled in configuration");
} }
} catch (Exception e) { } catch (IOException e) {
logger.log(Level.SEVERE, "unable to create event path service " + entry.getKey() + ", reason " + e.getMessage(), e); logger.log(Level.SEVERE, "unable to create event path service " + entry.getKey() + ", reason " + e.getMessage(), e);
} }
} }
return this; return this;
} }
public void add(String name, Path path, String eventType) throws IOException{
createQueue(name, path);
add(new PathEventService(this, name, path, eventType, TimeValue.timeValueHours(72)));
}
public void add(String name, Path path, String eventType, TimeValue lifetime) throws IOException{
createQueue(name, path);
add(new PathEventService(this, name, path, eventType, lifetime));
}
public void add(PathEventService pathEventService) { public void add(PathEventService pathEventService) {
Future<?> future = executorService.submit(pathEventService); Future<?> future = executorService.submit(pathEventService);
eventServiceMap.put(future, pathEventService); eventServiceMap.put(future, pathEventService);
logger.log(Level.INFO, "path event service " + pathEventService + " added"); logger.log(Level.INFO, "path event service " + pathEventService + " added for path " + pathEventService.getPath());
}
public Collection<PathEventService> getPathEventServices() {
return eventServiceMap.values();
}
public PathEventService getPathEventService(String name) {
return eventServiceMap.values().stream().filter(p -> p.getName().equals(name)).findFirst().orElse(null);
} }
public void publish(String eventType, Path path) { public void publish(String eventType, Path path) {
@ -111,19 +130,28 @@ public class PathEventManagerService extends AbstractEventManagerService impleme
}); });
} }
public boolean putIfNotExists(Path path, String key, Map<String,Object> map) throws IOException { public boolean publishJsonIfNotExists(PathEventService pathEventService,
String key,
Map<String,Object> map) throws IOException {
Path path = pathEventService.getPath();
if (!exists(path, key, ".json")) { if (!exists(path, key, ".json")) {
return put(path, key, map); return publishJson(pathEventService, key, map);
} else { } else {
return false; return false;
} }
} }
public boolean put(Path path, String key, Map<String,Object> map) throws IOException { public boolean publishJson(PathEventService pathEventService,
return put(path, key, ".json", Json.toString(map)); String key,
Map<String,Object> map) throws IOException {
return publishJson(pathEventService, key, ".json", Json.toString(map));
} }
public boolean put(Path path, String key, String suffix, String string) throws IOException { public boolean publishJson(PathEventService pathEventService,
String key,
String suffix,
String payload) throws IOException {
Path path = pathEventService.getPath();
String keyFileName = key + suffix; String keyFileName = key + suffix;
if (Files.exists(path.resolve(Event.INCOMING).resolve(keyFileName)) || if (Files.exists(path.resolve(Event.INCOMING).resolve(keyFileName)) ||
Files.exists(path.resolve(Event.SUCCESS).resolve(keyFileName))) { Files.exists(path.resolve(Event.SUCCESS).resolve(keyFileName))) {
@ -132,7 +160,7 @@ public class PathEventManagerService extends AbstractEventManagerService impleme
} }
Path p = path.resolve(Event.INCOMING).resolve(keyFileName); Path p = path.resolve(Event.INCOMING).resolve(keyFileName);
try (Writer writer = Files.newBufferedWriter(p)) { try (Writer writer = Files.newBufferedWriter(p)) {
writer.write(string); writer.write(payload);
} }
// obligatory purge. This is hacky. // obligatory purge. This is hacky.
eventServiceMap.forEach((k, v) -> { eventServiceMap.forEach((k, v) -> {
@ -145,12 +173,6 @@ public class PathEventManagerService extends AbstractEventManagerService impleme
return true; return true;
} }
public boolean exists(Path path, String key, String suffix) {
String keyFileName = key + suffix;
return Files.exists(path.resolve(Event.INCOMING).resolve(keyFileName)) ||
Files.exists(path.resolve(Event.SUCCESS).resolve(keyFileName));
}
public long sizeOfIncoming(Path path) throws IOException { public long sizeOfIncoming(Path path) throws IOException {
return sizeOf(path.resolve(Event.INCOMING)); return sizeOf(path.resolve(Event.INCOMING));
} }
@ -169,13 +191,19 @@ public class PathEventManagerService extends AbstractEventManagerService impleme
} }
} }
private static void createQueue(String name, Path p) throws IOException { private static boolean exists(Path path, String key, String suffix) {
logger.log(Level.FINE, "creating queue " + name + " at " + p); String keyFileName = key + suffix;
if (!Files.exists(p)) { return Files.exists(path.resolve(Event.INCOMING).resolve(keyFileName)) ||
Files.createDirectories(p); Files.exists(path.resolve(Event.SUCCESS).resolve(keyFileName));
}
private static void createQueue(String name, Path path) throws IOException {
logger.log(Level.FINE, "creating queue " + name + " at " + path);
if (!Files.exists(path)) {
Files.createDirectories(path);
} }
for (String s: List.of(Event.INCOMING, Event.SUCCESS, Event.FAIL)) { for (String s: List.of(Event.INCOMING, Event.SUCCESS, Event.FAIL)) {
Path dir = p.resolve(s); Path dir = path.resolve(s);
if (!Files.exists(dir)) { if (!Files.exists(dir)) {
logger.log(Level.FINE, "creating queue " + name + " dir " + dir); logger.log(Level.FINE, "creating queue " + name + " dir " + dir);
Files.createDirectories(dir); Files.createDirectories(dir);

View file

@ -19,6 +19,7 @@ import java.nio.file.WatchService;
import java.nio.file.attribute.BasicFileAttributeView; import java.nio.file.attribute.BasicFileAttributeView;
import java.nio.file.attribute.BasicFileAttributes; import java.nio.file.attribute.BasicFileAttributes;
import java.time.Instant; import java.time.Instant;
import java.util.Objects;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -48,7 +49,7 @@ public class PathEventService implements Callable<Boolean>, Closeable {
TimeValue lifetime) throws IOException { TimeValue lifetime) throws IOException {
this.pathEventManager = pathEventManager; this.pathEventManager = pathEventManager;
this.name = name; this.name = name;
this.path = path; this.path = Objects.requireNonNull(path, "path must not be null");
this.eventType = eventType; this.eventType = eventType;
this.lifetime = lifetime; this.lifetime = lifetime;
this.watchService = path.getFileSystem().newWatchService(); this.watchService = path.getFileSystem().newWatchService();
@ -59,6 +60,14 @@ public class PathEventService implements Callable<Boolean>, Closeable {
logger.log(Level.INFO, "path event service created for files at " + path); logger.log(Level.INFO, "path event service created for files at " + path);
} }
public String getName() {
return name;
}
public Path getPath() {
return path;
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public Boolean call() { public Boolean call() {

View file

@ -1,6 +1,6 @@
package org.xbib.event.persistence; package org.xbib.event.persistence;
import org.xbib.net.util.ExceptionFormatter; import org.xbib.event.util.ExceptionFormatter;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;

View file

@ -1,7 +1,7 @@
package org.xbib.event.timer; package org.xbib.event.timer;
import org.xbib.event.Event; import org.xbib.event.Event;
import org.xbib.event.Payload; import org.xbib.event.common.PayloadImpl;
import org.xbib.event.common.AbstractEventManagerService; import org.xbib.event.common.AbstractEventManagerService;
import org.xbib.event.common.EventManager; import org.xbib.event.common.EventManager;
import org.xbib.event.common.EventManagerService; import org.xbib.event.common.EventManagerService;
@ -67,7 +67,7 @@ public class TimerEventManagerService extends AbstractEventManagerService implem
public boolean schedule(String name, public boolean schedule(String name,
String timeSpec, String timeSpec,
Payload payload) throws ParseException, IOException { PayloadImpl payload) throws ParseException, IOException {
if (services.containsKey(name)) { if (services.containsKey(name)) {
Span span = Chronic.parse(timeSpec); Span span = Chronic.parse(timeSpec);
if (span != null) { if (span != null) {
@ -87,7 +87,7 @@ public class TimerEventManagerService extends AbstractEventManagerService implem
public boolean schedule(String service, public boolean schedule(String service,
Instant instant, Instant instant,
Payload payload) throws IOException { PayloadImpl payload) throws IOException {
if (services.containsKey(service)) { if (services.containsKey(service)) {
services.get(service).schedule(instant, payload); services.get(service).schedule(instant, payload);
return true; return true;

View file

@ -1,7 +1,7 @@
package org.xbib.event.timer; package org.xbib.event.timer;
import org.xbib.event.Event; import org.xbib.event.Event;
import org.xbib.event.Payload; import org.xbib.event.common.PayloadImpl;
import org.xbib.event.common.EventManager; import org.xbib.event.common.EventManager;
import org.xbib.event.persistence.PersistenceStore; import org.xbib.event.persistence.PersistenceStore;
@ -48,7 +48,7 @@ class TimerEventService implements Closeable {
logger.log(Level.INFO, "timer event service " + name + " loaded and rescheduled, " + tasknum + " timer tasks"); logger.log(Level.INFO, "timer event service " + name + " loaded and rescheduled, " + tasknum + " timer tasks");
} }
void schedule(Instant instant, Payload payload) throws IOException { void schedule(Instant instant, PayloadImpl payload) throws IOException {
String scheduled = instant.atZone(zoneId).format(DateTimeFormatter.ISO_DATE_TIME); String scheduled = instant.atZone(zoneId).format(DateTimeFormatter.ISO_DATE_TIME);
payload.put("scheduled", scheduled); payload.put("scheduled", scheduled);
TimerEventTask timerEventTask = new TimerEventTask(payload); TimerEventTask timerEventTask = new TimerEventTask(payload);
@ -64,7 +64,7 @@ class TimerEventService implements Closeable {
persistenceStore.clear(); persistenceStore.clear();
persistenceStore.commit(); persistenceStore.commit();
for (Map<String, Object> task : tasks) { for (Map<String, Object> task : tasks) {
Payload payload = new Payload(task); PayloadImpl payload = new PayloadImpl(task);
ZonedDateTime scheduledDate = ZonedDateTime.parse((String) task.get("scheduled"), DateTimeFormatter.ISO_DATE_TIME); ZonedDateTime scheduledDate = ZonedDateTime.parse((String) task.get("scheduled"), DateTimeFormatter.ISO_DATE_TIME);
if (scheduledDate.isBefore(ZonedDateTime.now())) { if (scheduledDate.isBefore(ZonedDateTime.now())) {
logger.log(Level.WARNING, "scheduled timer task " + task + " date already passed"); logger.log(Level.WARNING, "scheduled timer task " + task + " date already passed");
@ -89,9 +89,9 @@ class TimerEventService implements Closeable {
public class TimerEventTask extends TimerTask { public class TimerEventTask extends TimerTask {
private final Payload payload; private final PayloadImpl payload;
public TimerEventTask(Payload payload) throws IOException { public TimerEventTask(PayloadImpl payload) throws IOException {
this.payload = payload; this.payload = payload;
persistenceStore.insert("tasks", this.payload); persistenceStore.insert("tasks", this.payload);
} }

View file

@ -0,0 +1,56 @@
package org.xbib.event.util;
import java.io.PrintWriter;
import java.io.StringWriter;
/**
* Format exception messages and stack traces.
*/
public final class ExceptionFormatter {
private ExceptionFormatter() {
}
/**
* Format exception with stack trace.
*
* @param t the thrown object
* @return the formatted exception
*/
public static String format(Throwable t) {
StringBuilder sb = new StringBuilder();
append(sb, t, 0, true);
return sb.toString();
}
/**
* Append Exception to string builder.
*/
private static void append(StringBuilder sb, Throwable t, int level, boolean details) {
if (((t != null) && (t.getMessage() != null)) && (!t.getMessage().isEmpty())) {
if (details && (level > 0)) {
sb.append("\n\nCaused by\n");
}
sb.append(t.getMessage());
}
if (details) {
if (t != null) {
if ((t.getMessage() != null) && (t.getMessage().isEmpty())) {
sb.append("\n\nCaused by ");
} else {
sb.append("\n\n");
}
}
StringWriter sw = new StringWriter();
if (t != null) {
t.printStackTrace(new PrintWriter(sw));
}
sb.append(sw.toString());
}
if (t != null) {
if (t.getCause() != null) {
append(sb, t.getCause(), level + 1, details);
}
}
}
}

View file

@ -1,7 +1,11 @@
package org.xbib.event.path; package org.xbib.event.path;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.xbib.datastructures.api.TimeValue;
import org.xbib.event.EventConsumer;
import org.xbib.event.PathEvent; import org.xbib.event.PathEvent;
import org.xbib.event.bus.AllowConcurrentEvents;
import org.xbib.event.bus.Subscribe;
import org.xbib.event.common.EventManager; import org.xbib.event.common.EventManager;
import org.xbib.event.common.PathEventImpl; import org.xbib.event.common.PathEventImpl;
import org.xbib.settings.Settings; import org.xbib.settings.Settings;
@ -10,6 +14,7 @@ import java.io.BufferedWriter;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Map;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -31,9 +36,9 @@ public class PathEventManagerTest {
.build(); .build();
Path testTxt = path.resolve("incoming").resolve("test.txt"); Path testTxt = path.resolve("incoming").resolve("test.txt");
try (BufferedWriter bufferedWriter = Files.newBufferedWriter(testTxt)) { try (BufferedWriter bufferedWriter = Files.newBufferedWriter(testTxt)) {
bufferedWriter.write("Hello"); bufferedWriter.write("{\"Hello\":\"world\"}");
} }
Thread.sleep(2000L); Thread.sleep(1000L);
eventManager.shutdown(); eventManager.shutdown();
// extra destroy to clean up test // extra destroy to clean up test
eventManager.getPathEventManagerService().destroy(); eventManager.getPathEventManagerService().destroy();
@ -55,9 +60,30 @@ public class PathEventManagerTest {
.build(); .build();
Path testTxt = path.resolve("incoming").resolve("test.txt"); Path testTxt = path.resolve("incoming").resolve("test.txt");
try (BufferedWriter bufferedWriter = Files.newBufferedWriter(testTxt)) { try (BufferedWriter bufferedWriter = Files.newBufferedWriter(testTxt)) {
bufferedWriter.write("Hello"); bufferedWriter.write("{\"Hello\":\"world\"}");
} }
Thread.sleep(2000L); Thread.sleep(1000L);
eventManager.shutdown();
// extra destroy to clean up test
eventManager.getPathEventManagerService().destroy();
}
@Test
public void testPathEventByApi() throws Exception {
TestPathExtEventConsumer consumer = new TestPathExtEventConsumer();
EventManager eventManager = EventManager.builder()
.register("path-ext", PathExtEvent.class)
.register(consumer)
.build();
// create by API
Path path = Files.createTempDirectory("testpath");
eventManager.getPathEventManagerService().add("test", path, "path-ext", TimeValue.timeValueHours(72));
// publish
PathEventService pathEventService = eventManager.getPathEventManagerService().getPathEventService("test");
eventManager.getPathEventManagerService()
.publishJson(pathEventService, "key", Map.of("hello", "world"));
// everything done
Thread.sleep(1000L);
eventManager.shutdown(); eventManager.shutdown();
// extra destroy to clean up test // extra destroy to clean up test
eventManager.getPathEventManagerService().destroy(); eventManager.getPathEventManagerService().destroy();
@ -70,4 +96,25 @@ public class PathEventManagerTest {
logger.log(Level.INFO, "I'm the path ext event"); logger.log(Level.INFO, "I'm the path ext event");
} }
} }
public static class TestPathEventConsumer implements EventConsumer {
@Subscribe
@AllowConcurrentEvents
void onEvent(PathEvent event) {
logger.log(Level.INFO, "received path event, path = " + event.getPath() +
" payload = " + event.getPayload());
}
}
public static class TestPathExtEventConsumer implements EventConsumer {
@Subscribe
@AllowConcurrentEvents
void onEvent(PathExtEvent event) {
logger.log(Level.INFO, "received path ext event, path = " + event.getPath() +
" payload = " + event.getPayload());
}
}
} }

View file

@ -1,7 +1,7 @@
package org.xbib.event.timer; package org.xbib.event.timer;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.xbib.event.Payload; import org.xbib.event.common.PayloadImpl;
import org.xbib.event.common.EventManager; import org.xbib.event.common.EventManager;
import org.xbib.settings.Settings; import org.xbib.settings.Settings;
@ -22,7 +22,7 @@ public class TimerEventManagerTest {
.register(consumer) .register(consumer)
.build(); .build();
TimerEventManagerService timerEventManager = eventManager.getTimerEventManagerService(); TimerEventManagerService timerEventManager = eventManager.getTimerEventManagerService();
Payload payload = new Payload(Map.of("a", "b")); PayloadImpl payload = new PayloadImpl(Map.of("a", "b"));
timerEventManager.schedule("testtimerevent", Instant.now().plusSeconds(5L), payload); timerEventManager.schedule("testtimerevent", Instant.now().plusSeconds(5L), payload);
Thread.sleep(10000L); Thread.sleep(10000L);
timerEventManager.shutdown(); timerEventManager.shutdown();

View file

@ -1,7 +1,7 @@
dependencies { dependencies {
api project(':event-common') api project(':event-common')
api libs.net.http.server.netty api libs.net.http.server.netty.secure
api libs.net.http.client.netty api libs.net.http.client.netty.secure
implementation libs.settings.datastructures.json implementation libs.settings.datastructures.json
implementation libs.datastructures.json.tiny implementation libs.datastructures.json.tiny
} }

View file

@ -2,7 +2,10 @@ package org.xbib.event.net.http.test;
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.xbib.event.Event;
import org.xbib.event.Listener;
import org.xbib.event.common.EventManager; import org.xbib.event.common.EventManager;
import org.xbib.event.common.GenericEventImpl;
import org.xbib.event.net.http.HttpEventManagerService; import org.xbib.event.net.http.HttpEventManagerService;
import org.xbib.net.NetworkClass; import org.xbib.net.NetworkClass;
import org.xbib.net.URL; import org.xbib.net.URL;
@ -22,15 +25,13 @@ import org.xbib.net.http.server.netty.NettyHttpServerConfig;
import org.xbib.net.http.server.route.BaseHttpRouter; import org.xbib.net.http.server.route.BaseHttpRouter;
import org.xbib.net.http.server.route.HttpRouter; import org.xbib.net.http.server.route.HttpRouter;
import org.xbib.net.http.server.service.BaseHttpService; import org.xbib.net.http.server.service.BaseHttpService;
import org.xbib.net.util.JsonUtil;
import java.io.IOException;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
public class HttpEventTest { public class HttpEventTest {
@ -48,6 +49,7 @@ public class HttpEventTest {
nettyHttpServerConfig.setDebug(true); nettyHttpServerConfig.setDebug(true);
EventManager eventManager = EventManager.builder() EventManager eventManager = EventManager.builder()
.register("dummy", DummyEvent.class)
.build(); .build();
HttpEventManagerService httpEventManagerService = eventManager.getEventManagerService(HttpEventManagerService.class); HttpEventManagerService httpEventManagerService = eventManager.getEventManagerService(HttpEventManagerService.class);
@ -58,10 +60,12 @@ public class HttpEventTest {
.addService(BaseHttpService.builder() .addService(BaseHttpService.builder()
.setPath("/event") .setPath("/event")
.setHandler(ctx -> { .setHandler(ctx -> {
DummyEvent dummyEvent = EventManager.eventOf("dummy", DummyEvent.class);
dummyEvent.setListener(new DummyEventListener());
ctx.status(HttpResponseStatus.OK) ctx.status(HttpResponseStatus.OK)
.header(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.TEXT_PLAIN) .header(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.TEXT_PLAIN)
.charset(StandardCharsets.UTF_8) .charset(StandardCharsets.UTF_8)
.body(ctx.getRequest().asJson()) .body(dummyEvent.toJson())
.done(); .done();
}) })
.build()) .build())
@ -93,14 +97,9 @@ public class HttpEventTest {
" status = " + resp.getStatus() + " status = " + resp.getStatus() +
" header = " + resp.getHeaders() + " header = " + resp.getHeaders() +
" body = " + body); " body = " + body);
try { DummyEvent dummyEvent = (DummyEvent) EventManager.eventFromJson(body);
Map<String, Object> map = JsonUtil.toMap(body); assertNotNull(dummyEvent);
org.xbib.net.http.server.netty.HttpRequest httpRequest = org.xbib.net.http.server.netty.HttpRequest.builder() logger.log(Level.INFO, "dummy event transported = " + dummyEvent);
.parse(map).build();
logger.log(Level.INFO, "parsed http request = " + httpRequest.asJson());
} catch (IOException e) {
throw new RuntimeException(e);
}
received.set(true); received.set(true);
}) })
.build(); .build();
@ -109,4 +108,19 @@ public class HttpEventTest {
assertTrue(received.get()); assertTrue(received.get());
} }
} }
public static class DummyEvent extends GenericEventImpl {
public DummyEvent(EventManager.EventBuilder builder) {
super(builder);
}
}
public static class DummyEventListener implements Listener {
@Override
public void listen(Event event) {
logger.log(Level.INFO, "got event " + event);
}
}
} }

View file

@ -0,0 +1,4 @@
org.xbib.net.http.server.netty.http1.Http1ChannelInitializer
org.xbib.net.http.server.netty.http2.Http2ChannelInitializer
org.xbib.net.http.server.netty.secure.http1.Https1ChannelInitializer
org.xbib.net.http.server.netty.secure.http2.Https2ChannelInitializer

View file

@ -1,3 +1,3 @@
group = org.xbib group = org.xbib
name = event name = event
version = 0.2.0 version = 0.3.0

View file

@ -17,18 +17,16 @@ dependencyResolutionManagement {
libs { libs {
version('gradle', '8.5') version('gradle', '8.5')
version('datastructures', '5.0.6') version('datastructures', '5.0.6')
version('net', '4.0.4') version('net-http', '4.4.0')
version('net-http', '4.1.0') version('netty', '4.1.107.Final')
version('netty', '4.1.104.Final')
library('netty-handler', 'io.netty', 'netty-handler').versionRef('netty') library('netty-handler', 'io.netty', 'netty-handler').versionRef('netty')
library('net', 'org.xbib', 'net').versionRef('net')
library('datastructures-common', 'org.xbib', 'datastructures-common').versionRef('datastructures') library('datastructures-common', 'org.xbib', 'datastructures-common').versionRef('datastructures')
library('datastructures-json-tiny', 'org.xbib', 'datastructures-json-tiny').versionRef('datastructures') library('datastructures-json-tiny', 'org.xbib', 'datastructures-json-tiny').versionRef('datastructures')
library('settings-api', 'org.xbib', 'settings-api').versionRef('datastructures') library('settings-api', 'org.xbib', 'settings-api').versionRef('datastructures')
library('settings-datastructures-json', 'org.xbib', 'settings-datastructures-json').versionRef('datastructures') library('settings-datastructures-json', 'org.xbib', 'settings-datastructures-json').versionRef('datastructures')
library('time', 'org.xbib', 'time').version('4.0.0') library('time', 'org.xbib', 'time').version('4.0.0')
library('net-http-server-netty', 'org.xbib', 'net-http-server-netty-secure').versionRef('net-http') library('net-http-server-netty-secure', 'org.xbib', 'net-http-server-netty-secure').versionRef('net-http')
library('net-http-client-netty', 'org.xbib', 'net-http-client-netty-secure').versionRef('net-http') library('net-http-client-netty-secure', 'org.xbib', 'net-http-client-netty-secure').versionRef('net-http')
library('reactivestreams', 'org.reactivestreams', 'reactive-streams').version('1.0.3') library('reactivestreams', 'org.reactivestreams', 'reactive-streams').version('1.0.3')
library('rxjava3', 'io.reactivex.rxjava3', 'rxjava').version('3.0.3') library('rxjava3', 'io.reactivex.rxjava3', 'rxjava').version('3.0.3')
} }