register event types

This commit is contained in:
Jörg Prante 2024-01-24 17:03:17 +01:00
parent c8141c3755
commit b96466515c
24 changed files with 322 additions and 327 deletions

View file

@ -12,6 +12,7 @@ module org.xbib.event.common {
exports org.xbib.event.wal;
uses EventManagerService;
uses org.xbib.event.EventConsumer;
uses org.xbib.event.Event;
requires org.xbib.event.api;
requires org.xbib.settings.api;
requires org.xbib.settings.datastructures.json;

View file

@ -6,7 +6,7 @@ import java.util.logging.Logger;
import org.xbib.event.Event;
import org.xbib.event.bus.EventBus;
import org.xbib.event.common.EventImpl;
import org.xbib.event.common.EventManager;
public class ClockEventService implements Callable<Integer> {
@ -33,7 +33,7 @@ public class ClockEventService implements Callable<Integer> {
logger.log(Level.FINE, "clock event " + name + " suspended");
return 1;
} else {
Event clockEvent = EventImpl.builder()
Event clockEvent = EventManager.eventBuilder()
.setType("clock")
.build();
eventBus.post(clockEvent);

View file

@ -2,9 +2,9 @@ package org.xbib.event.common;
import org.xbib.event.ClockEvent;
public class ClockEventImpl extends EventImpl implements ClockEvent {
public class ClockEventImpl extends EventManager.EventImpl implements ClockEvent {
public ClockEventImpl(EventBuilder builder) {
public ClockEventImpl(EventManager.EventBuilder builder) {
super(builder);
}
}

View file

@ -1,111 +0,0 @@
package org.xbib.event.common;
import org.xbib.event.Event;
import org.xbib.event.Listener;
import org.xbib.event.Payload;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
public class EventBuilder {
Listener listener;
String type;
String code;
String message;
Instant scheduled;
Instant created;
Payload payload;
Path path;
String base;
String suffix;
long fileSize;
long maxFileSize;
EventBuilder() {
this.maxFileSize = -1L;
}
public EventBuilder setListener(Listener listener) {
this.listener = listener;
return this;
}
public EventBuilder setType(String type) {
this.type = type;
return this;
}
public EventBuilder setCode(String code) {
this.code = code;
return this;
}
public EventBuilder setMessage(String message) {
this.message = message;
return this;
}
public EventBuilder setScheduledFor(Instant scheduled) {
this.scheduled = scheduled;
return this;
}
public EventBuilder setPayload(Payload payload) {
this.payload = payload;
return this;
}
public EventBuilder setMaxFileSize(long maxFileSize) {
this.maxFileSize = maxFileSize;
return this;
}
public EventBuilder setPath(Path path) throws IOException {
this.path = path;
base = getBase(path);
suffix = getSuffix(path);
fileSize = Files.size(path);
if (maxFileSize != -1L && fileSize > maxFileSize) {
throw new IOException("file size too large");
}
return this;
}
public Event build() {
this.created = Instant.now();
return switch (type) {
case "clock" -> new ClockEventImpl(this);
case "timer" -> new TimerEventImpl(this);
case "path" -> new PathEventImpl(this);
case "filefollow" -> new FileFollowEventImpl(this);
case "generic" -> new GenericEventImpl(this, listener);
default -> new EventImpl(this);
};
}
private static String getBase(Path path) {
String name = path.getFileName().toString();
int pos = name.lastIndexOf('.');
return pos >= 0 ? name.substring(0, pos) : name;
}
private static String getSuffix(Path path) {
String name = path.getFileName().toString();
int pos = name.lastIndexOf('.');
return pos >= 0 ? name.substring(pos + 1) : null;
}
}

View file

@ -1,125 +0,0 @@
package org.xbib.event.common;
import org.xbib.datastructures.json.tiny.Json;
import org.xbib.event.Event;
import org.xbib.event.Listener;
import org.xbib.event.Payload;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.FileTime;
import java.time.Instant;
import java.util.Map;
public class EventImpl implements Event {
private final EventBuilder builder;
EventImpl(EventBuilder builder) {
this.builder = builder;
}
public static EventBuilder builder() {
return new EventBuilder();
}
public Listener getListener() {
return builder.listener;
}
@Override
public String getType() {
return builder.type;
}
@Override
public String getCode() {
return builder.code;
}
@Override
public String getMessage() {
return builder.message;
}
@Override
public Payload getPayload() {
return builder.payload;
}
@Override
public Instant getCreated() {
return builder.created;
}
@Override
public Instant getScheduledFor() {
return builder.scheduled;
}
@Override
public Path getPath() {
return builder.path;
}
@Override
public String getBase() {
return builder.base;
}
@Override
public String getSuffix() {
return builder.suffix;
}
@Override
public long getFileSize() {
return builder.fileSize;
}
public static Event fromFile(Path file) throws IOException {
return fromJson(Files.readString(file));
}
public static Event fromJson(String json) {
Map<String, Object> map = Json.toMap(json);
return builder()
.setType(map.getOrDefault("type", "generic").toString())
.setPayload(new Payload(map))
.build();
}
public String toJson() throws IOException {
return Json.toString(builder.payload);
}
@Override
public boolean isNullEvent() {
return builder.type == null;
}
@Override
public String toString() {
return "Event[path = " + builder.path + ", base = " + builder.base + ", suffix = " + builder.suffix + " payload = " + builder.payload +"]";
}
public void success() throws IOException {
if (builder.path != null) {
Files.setLastModifiedTime(builder.path, FileTime.from(Instant.now()));
Files.move(builder.path, builder.path.getParent().resolve(Event.SUCCESS)
.resolve(builder.path.getFileName()).toAbsolutePath(),
StandardCopyOption.REPLACE_EXISTING);
}
}
public void fail() throws IOException {
Files.setLastModifiedTime(builder.path, FileTime.from(Instant.now()));
Files.move(builder.path, builder.path.getParent().resolve(Event.FAIL)
.resolve(builder.path.getFileName()).toAbsolutePath(),
StandardCopyOption.REPLACE_EXISTING);
}
}

View file

@ -1,11 +1,21 @@
package org.xbib.event.common;
import java.lang.reflect.InvocationTargetException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.FileTime;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.ServiceLoader;
import org.xbib.datastructures.json.tiny.Json;
import org.xbib.event.Event;
import org.xbib.event.EventConsumer;
import org.xbib.event.FileFollowEvent;
import org.xbib.event.Listener;
import org.xbib.event.Payload;
import org.xbib.event.bus.AsyncEventBus;
import org.xbib.event.bus.EventBus;
import org.xbib.event.bus.SubscriberExceptionContext;
@ -31,12 +41,24 @@ public final class EventManager {
private static final Logger logger = Logger.getLogger(EventManager.class.getName());
private final Builder builder;
private final EventManagerBuilder builder;
private static final Map<String, Class<? extends Event>> eventTypes = new HashMap<>();
private final Map<Class<? extends EventManagerService>, EventManagerService> eventManagerServices;
private EventManager(Builder builder) {
private EventManager(EventManagerBuilder builder) {
this.builder = builder;
eventTypes.put("null", NullEvent.class);
eventTypes.put("generic", GenericEventImpl.class);
eventTypes.put("path", PathEventImpl.class);
eventTypes.put("clock", ClockEventImpl.class);
eventTypes.put("timer", TimerEventImpl.class);
eventTypes.put("filefollow", FileFollowEventImpl.class);
for (Event event : ServiceLoader.load(Event.class)) {
eventTypes.put(event.getType(), event.getClass());
}
logger.log(Level.INFO, "installed events = " + eventTypes.keySet());
this.eventManagerServices = new HashMap<>();
eventManagerServices.put(GenericEventManagerService.class, new GenericEventManagerService().init(this));
eventManagerServices.put(ClockEventManagerService.class, new ClockEventManagerService().init(this));
@ -49,6 +71,14 @@ public final class EventManager {
logger.log(Level.INFO, "installed event service managers = " + eventManagerServices.keySet());
}
public static EventManagerBuilder builder() {
return new EventManagerBuilder();
}
public static EventBuilder eventBuilder() {
return new EventBuilder();
}
public Settings getSettings() {
return builder.settings;
}
@ -65,11 +95,7 @@ public final class EventManager {
return builder.executorService;
}
public static Builder builder() {
return new Builder();
}
public void submit(Event event) {
public void dispatch(Event event) {
getGenericEventManagerService().post(event);
}
@ -113,7 +139,7 @@ public final class EventManager {
}
}
public static class Builder {
public static class EventManagerBuilder {
private Settings settings;
@ -129,48 +155,53 @@ public final class EventManager {
private final List<EventConsumer> eventConsumers;
private Builder() {
private EventManagerBuilder() {
this.eventConsumers = new ArrayList<>();
this.subscriberExceptionHandler = new EventManagerExceptionHandler();
}
public Builder setSettings(Settings settings) {
public EventManagerBuilder setSettings(Settings settings) {
this.settings = settings;
return this;
}
public Builder setThreadCount(int threadCount) {
public EventManagerBuilder setThreadCount(int threadCount) {
this.threadcount = threadCount;
return this;
}
public Builder setExecutorService(ExecutorService executorService) {
public EventManagerBuilder setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
return this;
}
public Builder setClassLoader(ClassLoader classLoader) {
public EventManagerBuilder setClassLoader(ClassLoader classLoader) {
this.classLoader = classLoader;
return this;
}
public Builder setEventBus(EventBus eventBus) {
public EventManagerBuilder setEventBus(EventBus eventBus) {
this.eventBus = eventBus;
return this;
}
public Builder register(EventConsumer eventConsumer) {
public EventManagerBuilder register(String eventType, Class<? extends Event> eventClass) {
eventTypes.put(eventType, eventClass);
return this;
}
public EventManagerBuilder register(EventConsumer eventConsumer) {
Objects.requireNonNull(eventConsumer, "event consumer must not be null");
eventConsumers.add(eventConsumer);
return this;
}
public Builder setSubscriberExceptionHandler(SubscriberExceptionHandler subscriberExceptionHandler) {
public EventManagerBuilder setSubscriberExceptionHandler(SubscriberExceptionHandler subscriberExceptionHandler) {
this.subscriberExceptionHandler = subscriberExceptionHandler;
return this;
}
public Builder loadEventConsumers() {
public EventManagerBuilder loadEventConsumers() {
if (classLoader == null) {
classLoader = EventManager.class.getClassLoader();
}
@ -210,4 +241,226 @@ public final class EventManager {
logger.log(Level.SEVERE, exception.getMessage(), exception);
}
}
public static Event eventFromFile(Path file) throws IOException {
return eventFromJson(Files.readString(file));
}
public static Event eventFromJson(String json) {
Map<String, Object> map = Json.toMap(json);
return eventBuilder()
.setType(map.getOrDefault("type", "generic").toString())
.setPayload(new Payload(map))
.build();
}
public static class EventImpl implements Event {
private final EventBuilder builder;
EventImpl(EventBuilder builder) {
this.builder = builder;
}
public Listener getListener() {
return builder.listener;
}
@Override
public String getType() {
return builder.type;
}
@Override
public String getCode() {
return builder.code;
}
@Override
public String getMessage() {
return builder.message;
}
@Override
public Payload getPayload() {
return builder.payload;
}
@Override
public Instant getCreated() {
return builder.created;
}
@Override
public Instant getScheduledFor() {
return builder.scheduled;
}
@Override
public Path getPath() {
return builder.path;
}
@Override
public String getBase() {
return builder.base;
}
@Override
public String getSuffix() {
return builder.suffix;
}
@Override
public long getFileSize() {
return builder.fileSize;
}
public String toJson() throws IOException {
return Json.toString(builder.payload);
}
@Override
public boolean isNullEvent() {
return builder.type == null;
}
@Override
public String toString() {
return "Event[path = " + builder.path + ", base = " + builder.base + ", suffix = " + builder.suffix + " payload = " + builder.payload +"]";
}
public void success() throws IOException {
if (builder.path != null) {
Files.setLastModifiedTime(builder.path, FileTime.from(Instant.now()));
Files.move(builder.path, builder.path.getParent().resolve(Event.SUCCESS)
.resolve(builder.path.getFileName()).toAbsolutePath(),
StandardCopyOption.REPLACE_EXISTING);
}
}
public void fail() throws IOException {
Files.setLastModifiedTime(builder.path, FileTime.from(Instant.now()));
Files.move(builder.path, builder.path.getParent().resolve(Event.FAIL)
.resolve(builder.path.getFileName()).toAbsolutePath(),
StandardCopyOption.REPLACE_EXISTING);
}
}
public static class EventBuilder {
Listener listener;
String type;
String code;
String message;
Instant scheduled;
Instant created;
Payload payload;
Path path;
String base;
String suffix;
long fileSize;
long maxFileSize;
EventBuilder() {
this.maxFileSize = -1L;
}
public EventBuilder setListener(Listener listener) {
this.listener = listener;
return this;
}
public EventBuilder setType(String type) {
if (type != null && eventTypes.containsKey(type)) {
this.type = type;
} else {
throw new IllegalArgumentException("unregistered event type: " + type);
}
return this;
}
public EventBuilder setCode(String code) {
this.code = code;
return this;
}
public EventBuilder setMessage(String message) {
this.message = message;
return this;
}
public EventBuilder setScheduledFor(Instant scheduled) {
this.scheduled = scheduled;
return this;
}
public EventBuilder setPayload(Payload payload) {
this.payload = payload;
return this;
}
public EventBuilder setMaxFileSize(long maxFileSize) {
this.maxFileSize = maxFileSize;
return this;
}
public EventBuilder setPath(Path path) throws IOException {
this.path = path;
base = getBase(path);
suffix = getSuffix(path);
fileSize = Files.size(path);
if (maxFileSize != -1L && fileSize > maxFileSize) {
throw new IOException("file size too large");
}
return this;
}
public Event build() {
if (type == null) {
type = "null";
}
Class<? extends Event> cl = eventTypes.get(type);
if (cl == null) {
cl = NullEvent.class;
}
try {
this.created = Instant.now();
if (listener != null) {
return cl.getDeclaredConstructor(EventBuilder.class, Listener.class).newInstance(this, listener);
} else {
return cl.getDeclaredConstructor(EventBuilder.class).newInstance(this);
}
} catch (NoSuchMethodException | InstantiationException | IllegalAccessException |
InvocationTargetException e) {
logger.log(Level.WARNING, "unable to construct event object for type " + type + ", falling back to NullEvent");
return new NullEvent(this);
}
}
private static String getBase(Path path) {
String name = path.getFileName().toString();
int pos = name.lastIndexOf('.');
return pos >= 0 ? name.substring(0, pos) : name;
}
private static String getSuffix(Path path) {
String name = path.getFileName().toString();
int pos = name.lastIndexOf('.');
return pos >= 0 ? name.substring(pos + 1) : null;
}
}
}

View file

@ -2,9 +2,9 @@ package org.xbib.event.common;
import org.xbib.event.FileFollowEvent;
public class FileFollowEventImpl extends EventImpl implements FileFollowEvent {
public class FileFollowEventImpl extends EventManager.EventImpl implements FileFollowEvent {
public FileFollowEventImpl(EventBuilder builder) {
public FileFollowEventImpl(EventManager.EventBuilder builder) {
super(builder);
}
}

View file

@ -4,15 +4,15 @@ import org.xbib.event.Listener;
import java.util.Objects;
public class GenericEventImpl extends EventImpl {
public class GenericEventImpl extends EventManager.EventImpl {
private final EventBuilder builder;
private final EventManager.EventBuilder builder;
public GenericEventImpl(EventBuilder builder) {
public GenericEventImpl(EventManager.EventBuilder builder) {
this(builder, null);
}
public GenericEventImpl(EventBuilder builder, Listener listener) {
public GenericEventImpl(EventManager.EventBuilder builder, Listener listener) {
super(builder);
this.builder = builder;
this.builder.listener = Objects.requireNonNull(listener);

View file

@ -0,0 +1,9 @@
package org.xbib.event.common;
import org.xbib.event.Event;
public class NullEvent extends EventManager.EventImpl implements Event {
NullEvent(EventManager.EventBuilder builder) {
super(builder);
}
}

View file

@ -2,9 +2,9 @@ package org.xbib.event.common;
import org.xbib.event.PathEvent;
public class PathEventImpl extends EventImpl implements PathEvent {
public class PathEventImpl extends EventManager.EventImpl implements PathEvent {
public PathEventImpl(EventBuilder builder) {
public PathEventImpl(EventManager.EventBuilder builder) {
super(builder);
}
}

View file

@ -2,9 +2,9 @@ package org.xbib.event.common;
import org.xbib.event.TimerEvent;
public class TimerEventImpl extends EventImpl implements TimerEvent {
public class TimerEventImpl extends EventManager.EventImpl implements TimerEvent {
public TimerEventImpl(EventBuilder builder) {
public TimerEventImpl(EventManager.EventBuilder builder) {
super(builder);
}
}

View file

@ -2,7 +2,7 @@ package org.xbib.event.path;
import org.xbib.event.Event;
import org.xbib.event.bus.EventBus;
import org.xbib.event.common.EventImpl;
import org.xbib.event.common.EventManager;
import org.xbib.settings.Settings;
import java.io.Closeable;
@ -89,7 +89,7 @@ public class FileFollowEventService implements Callable<Integer>, Closeable {
String content = readRange(channel, lastSize, currentSize);
// split content by line, this allows pattern matching without preprocessing in worker
for (String line : content.split("\n")) {
Event event = EventImpl.builder()
Event event = EventManager.eventBuilder()
.setType("filefollow")
.setCode(base.toString())
.setPath(path)

View file

@ -3,7 +3,7 @@ package org.xbib.event.path;
import org.xbib.datastructures.api.TimeValue;
import org.xbib.event.Event;
import org.xbib.event.bus.EventBus;
import org.xbib.event.common.EventImpl;
import org.xbib.event.common.EventManager;
import java.io.Closeable;
import java.io.IOException;
@ -147,7 +147,7 @@ public class PathEventService implements Callable<Integer>, Closeable {
private void postEvent(Path file) {
try {
Event event = EventImpl.fromFile(file);
Event event = EventManager.eventFromFile(file);
eventBus.post(event);
eventCount++;
} catch (IOException e) {
@ -157,7 +157,7 @@ public class PathEventService implements Callable<Integer>, Closeable {
private void failEvent(Path file) {
try {
Event event = EventImpl.fromFile(file);
Event event = EventManager.eventFromFile(file);
event.fail();
} catch (IOException e) {
logger.log(Level.SEVERE, "unable to fail event because of " + e.getMessage());

View file

@ -34,7 +34,6 @@ public class TimerEventManagerService implements EventManagerService {
public TimerEventManagerService init(EventManager eventManager) {
Settings settings = eventManager.getSettings();
EventBus eventBus = eventManager.getEventBus();
ClassLoader classLoader = eventManager.getClassLoader();
this.services = new LinkedHashMap<>();
for (Map.Entry<String, Settings> entry : settings.getGroups("event.timer").entrySet()) {
String name = entry.getKey();
@ -51,9 +50,9 @@ public class TimerEventManagerService implements EventManagerService {
return this;
}
public boolean put(String service,
String timeSpec,
Payload payload) throws ParseException, IOException {
public boolean publish(String service,
String timeSpec,
Payload payload) throws ParseException, IOException {
if (services.containsKey(service)) {
Span span = Chronic.parse(timeSpec);
if (span != null) {
@ -71,9 +70,9 @@ public class TimerEventManagerService implements EventManagerService {
return false;
}
public boolean put(String service,
Instant instant,
Payload payload) throws IOException {
public boolean publish(String service,
Instant instant,
Payload payload) throws IOException {
if (services.containsKey(service)) {
services.get(service).schedule(instant, payload);
return true;

View file

@ -3,7 +3,7 @@ package org.xbib.event.timer;
import org.xbib.event.Event;
import org.xbib.event.Payload;
import org.xbib.event.bus.EventBus;
import org.xbib.event.common.EventImpl;
import org.xbib.event.common.EventManager;
import org.xbib.event.persistence.PersistenceStore;
import java.io.Closeable;
@ -97,7 +97,7 @@ class TimerEventService implements Closeable {
@Override
public void run() {
try {
Event timerEvent = EventImpl.builder()
Event timerEvent = EventManager.eventBuilder()
.setType("timer")
.setPayload(payload)
.build();

View file

@ -1,10 +0,0 @@
package org.xbib.event.clock;
import org.xbib.event.common.ClockEventImpl;
import org.xbib.event.common.EventBuilder;
public class TestClockEvent extends ClockEventImpl {
public TestClockEvent(EventBuilder builder) {
super(builder);
}
}

View file

@ -1,10 +1,10 @@
package org.xbib.event.clock;
import org.xbib.event.ClockEvent;
import org.xbib.event.EventConsumer;
import org.xbib.event.bus.AllowConcurrentEvents;
import org.xbib.event.bus.Subscribe;
import java.io.IOException;
import java.time.Instant;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -15,7 +15,7 @@ public class TestClockEventConsumer implements EventConsumer {
@Subscribe
@AllowConcurrentEvents
void onEvent(TestClockEvent event) {
logger.log(Level.INFO, "received test clock event on " + Instant.now() + " event instant = " + event.getCreated());
void onEvent(ClockEvent event) {
logger.log(Level.INFO, "received clock event on " + Instant.now() + " event instant = " + event.getCreated());
}
}

View file

@ -4,7 +4,6 @@ import org.junit.jupiter.api.Test;
import org.xbib.event.Event;
import org.xbib.event.EventConsumer;
import org.xbib.event.bus.Subscribe;
import org.xbib.event.common.EventImpl;
import org.xbib.event.common.EventManager;
import org.xbib.event.common.GenericEventImpl;
import org.xbib.settings.Settings;
@ -29,7 +28,7 @@ public class GenericEventManagerTest {
.setSettings(settings)
.register(consumer)
.build();
Event event = EventImpl.builder()
Event event = EventManager.eventBuilder()
.setType("generic")
.setListener(e -> logger.log(Level.INFO, "received event " + e))
.build();
@ -47,7 +46,7 @@ public class GenericEventManagerTest {
.register(consumer)
.build();
CompletableFuture<Event> future = new CompletableFuture<>();
Event event = EventImpl.builder()
Event event = EventManager.eventBuilder()
.setType("generic")
.setListener(e -> {
logger.log(Level.INFO, "received event " + e);
@ -72,7 +71,7 @@ public class GenericEventManagerTest {
.loadEventConsumers()
.build();
CompletableFuture<Event> future = new CompletableFuture<>();
Event event = GenericEventImpl.builder()
Event event = EventManager.eventBuilder()
.setType("generic")
.setListener(e -> {
logger.log(Level.INFO, "received event " + e);

View file

@ -1,10 +0,0 @@
package org.xbib.event.path;
import org.xbib.event.common.EventBuilder;
import org.xbib.event.common.FileFollowEventImpl;
public class TestFileFollowEvent extends FileFollowEventImpl {
public TestFileFollowEvent(EventBuilder builder) {
super(builder);
}
}

View file

@ -1,6 +1,7 @@
package org.xbib.event.path;
import org.xbib.event.EventConsumer;
import org.xbib.event.FileFollowEvent;
import org.xbib.event.bus.AllowConcurrentEvents;
import org.xbib.event.bus.Subscribe;
@ -13,7 +14,7 @@ public class TestFileFollowEventConsumer implements EventConsumer {
@Subscribe
@AllowConcurrentEvents
void onEvent(TestFileFollowEvent event) {
logger.log(Level.INFO, "received filefollow event path = " + event.getPath() + " content = " + event.getMessage());
void onEvent(FileFollowEvent event) {
logger.log(Level.INFO, "received filefollow event, path = " + event.getPath() + " content = " + event.getMessage());
}
}

View file

@ -1,11 +0,0 @@
package org.xbib.event.timer;
import org.xbib.event.common.EventBuilder;
import org.xbib.event.common.TimerEventImpl;
public class TestTimerEvent extends TimerEventImpl {
public TestTimerEvent(EventBuilder builder) {
super(builder);
}
}

View file

@ -1,6 +1,7 @@
package org.xbib.event.timer;
import org.xbib.event.EventConsumer;
import org.xbib.event.TimerEvent;
import org.xbib.event.bus.AllowConcurrentEvents;
import org.xbib.event.bus.Subscribe;
@ -14,8 +15,8 @@ public class TestTimerEventConsumer implements EventConsumer {
@Subscribe
@AllowConcurrentEvents
void onEvent(TestTimerEvent event) {
logger.log(Level.INFO, "received test timer event on " + Instant.now() + " event instant = " + event.getCreated());
void onEvent(TimerEvent event) {
logger.log(Level.INFO, "received timer event on " + Instant.now() + " event instant = " + event.getCreated());
}
}

View file

@ -23,7 +23,7 @@ public class TimerEventManagerTest {
.build();
TimerEventManagerService timerEventManager = eventManager.getTimerEventManagerService();
Payload payload = new Payload(Map.of("a", "b"));
timerEventManager.put("testtimerevent", Instant.now().plusSeconds(5L), payload);
timerEventManager.publish("testtimerevent", Instant.now().plusSeconds(5L), payload);
Thread.sleep(10000L);
timerEventManager.shutdown();
}

View file

@ -2,7 +2,6 @@ package org.xbib.event.net.http;
import org.xbib.event.Event;
import org.xbib.event.common.EventManager;
import org.xbib.event.common.EventImpl;
import org.xbib.net.http.HttpHeaderNames;
import org.xbib.net.http.HttpHeaderValues;
import org.xbib.net.http.HttpMethod;
@ -28,11 +27,11 @@ public class HttpEventReceiverService {
.setPath("/event/{type}")
.setMethod(HttpMethod.POST)
.setHandler(ctx -> {
Event event = EventImpl.fromJson(ctx.getRequest().asJson());
Event event = EventManager.eventFromJson(ctx.getRequest().asJson());
if (event.isNullEvent()) {
ctx.status(NOT_FOUND).done();
} else {
eventManager.submit(event);
eventManager.dispatch(event);
ctx.status(OK)
.header(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON)
.charset(StandardCharsets.UTF_8)