add event http service

This commit is contained in:
Jörg Prante 2024-01-15 17:32:01 +01:00
parent 09a7a95b25
commit 7c5a1f1a2b
25 changed files with 156 additions and 70 deletions

View file

@ -1,33 +0,0 @@
package org.xbib.event;
import java.util.Map;
public class DefaultEvent implements Event {
private String key;
private Map<String, Object> map;
public DefaultEvent() {
}
@Override
public void setKey(String key) {
this.key = key;
}
@Override
public String getKey() {
return key;
}
@Override
public void setMap(Map<String, Object> map) {
this.map = map;
}
@Override
public Map<String, Object> getMap() {
return map;
}
}

View file

@ -4,9 +4,9 @@ import java.util.Map;
public interface Event { public interface Event {
void setKey(String key); void setType(String type);
String getKey(); String getType();
void setMap(Map<String, Object> map); void setMap(Map<String, Object> map);

View file

@ -1,14 +1,14 @@
package org.xbib.event.clock; package org.xbib.event.clock;
import org.xbib.event.DefaultEvent; import org.xbib.event.common.EventImpl;
import java.time.Instant; import java.time.Instant;
public class DefaultClockEvent extends DefaultEvent implements ClockEvent { public class ClockEventImpl extends EventImpl implements ClockEvent {
private Instant instant; private Instant instant;
public DefaultClockEvent() { public ClockEventImpl() {
} }
@Override @Override

View file

@ -29,7 +29,7 @@ public class ClockEventService implements Callable<Integer> {
} }
@Override @Override
public Integer call() throws Exception { public Integer call() {
try { try {
if (manager.getSuspended().contains(name)) { if (manager.getSuspended().contains(name)) {
logger.log(Level.FINE, "clock event " + name + " suspended"); logger.log(Level.FINE, "clock event " + name + " suspended");

View file

@ -0,0 +1,53 @@
package org.xbib.event.common;
import org.xbib.datastructures.json.tiny.Json;
import org.xbib.event.Event;
import java.io.IOException;
import java.util.Map;
public class EventImpl implements Event {
private String type;
private Map<String, Object> map;
public EventImpl() {
}
@Override
public void setType(String type) {
this.type = type;
}
@Override
public String getType() {
return type;
}
@Override
public void setMap(Map<String, Object> map) {
this.map = map;
}
@Override
public Map<String, Object> getMap() {
return map;
}
public static EventImpl fromJson(String json) {
Map<String, Object> map = Json.toMap(json);
EventImpl event = new EventImpl();
event.setType((String) map.get("type"));
event.setMap(map);
return event;
}
public String toJson() throws IOException {
return Json.toString(map);
}
public boolean isNullEvent() {
return type == null;
}
}

View file

@ -4,6 +4,7 @@ import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.ServiceLoader; import java.util.ServiceLoader;
import org.xbib.event.Event;
import org.xbib.event.EventConsumer; import org.xbib.event.EventConsumer;
import org.xbib.event.bus.AsyncEventBus; import org.xbib.event.bus.AsyncEventBus;
import org.xbib.event.bus.SubscriberExceptionContext; import org.xbib.event.bus.SubscriberExceptionContext;
@ -56,6 +57,10 @@ public final class EventManager {
return new Builder(settings); return new Builder(settings);
} }
public void submit(Event event) {
getGenericEventManagerService().post(event);
}
public EventManagerService getEventManagerService(Class<? extends EventManagerService> cl) { public EventManagerService getEventManagerService(Class<? extends EventManagerService> cl) {
return services.get(cl); return services.get(cl);
} }

View file

@ -1,20 +1,20 @@
package org.xbib.event.generic; package org.xbib.event.generic;
import org.xbib.event.DefaultEvent; import org.xbib.event.common.EventImpl;
public class DefaultGenericEvent extends DefaultEvent implements GenericEvent { public class GenericEventImpl extends EventImpl implements GenericEvent {
private Listener listener; private Listener listener;
public DefaultGenericEvent() { public GenericEventImpl() {
this(null); this(null);
} }
public DefaultGenericEvent(Listener listener) { public GenericEventImpl(Listener listener) {
this.listener = listener; this.listener = listener;
} }
public DefaultGenericEvent setListener(Listener listener) { public GenericEventImpl setListener(Listener listener) {
this.listener = listener; this.listener = listener;
return this; return this;
} }

View file

@ -4,7 +4,6 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import org.xbib.event.bus.AsyncEventBus;
import org.xbib.event.bus.EventBus; import org.xbib.event.bus.EventBus;
import org.xbib.event.bus.Subscriber; import org.xbib.event.bus.Subscriber;
import org.xbib.event.bus.SubscriberRegistry; import org.xbib.event.bus.SubscriberRegistry;
@ -28,7 +27,7 @@ public class GenericEventManagerService implements EventManagerService {
eventBus.post(event); eventBus.post(event);
} }
public void post(DefaultGenericEvent event, public void post(GenericEventImpl event,
CompletableFuture<GenericEvent> future) { CompletableFuture<GenericEvent> future) {
SubscriberRegistry subscriberRegistry = eventBus.getSubscribers(); SubscriberRegistry subscriberRegistry = eventBus.getSubscribers();
Set<Subscriber> set = subscriberRegistry.getSubscribersForTesting(event.getClass()); Set<Subscriber> set = subscriberRegistry.getSubscribersForTesting(event.getClass());

View file

@ -1,16 +1,16 @@
package org.xbib.event.path; package org.xbib.event.path;
import org.xbib.event.DefaultEvent; import org.xbib.event.common.EventImpl;
import java.nio.file.Path; import java.nio.file.Path;
public class DefaultFileFollowEvent extends DefaultEvent implements FileFollowEvent { public class FileFollowEventImpl extends EventImpl implements FileFollowEvent {
private Path path; private Path path;
private String content; private String content;
public DefaultFileFollowEvent() { public FileFollowEventImpl() {
} }
@Override @Override

View file

@ -43,7 +43,7 @@ public class FileFollowEventManagerService implements EventManagerService, Close
try { try {
Path base = Paths.get(baseStr); Path base = Paths.get(baseStr);
Pattern pattern = Pattern.compile(patternStr); Pattern pattern = Pattern.compile(patternStr);
String className = definition.get("class", DefaultFileFollowEvent.class.getName()); String className = definition.get("class", FileFollowEventImpl.class.getName());
Class<? extends FileFollowEvent> eventClass = (Class<? extends FileFollowEvent>) classLoader.loadClass(className); Class<? extends FileFollowEvent> eventClass = (Class<? extends FileFollowEvent>) classLoader.loadClass(className);
FileFollowEventService fileFollowEventService = new FileFollowEventService(definition, eventBus, base, pattern, eventClass); FileFollowEventService fileFollowEventService = new FileFollowEventService(definition, eventBus, base, pattern, eventClass);
Future<?> future = executorService.submit(fileFollowEventService); Future<?> future = executorService.submit(fileFollowEventService);

View file

@ -92,7 +92,7 @@ public class FileFollowEventService implements Callable<Integer>, Closeable {
// split content by line, this allows pattern matching without preprocessing in worker // split content by line, this allows pattern matching without preprocessing in worker
for (String line : content.split("\n")) { for (String line : content.split("\n")) {
FileFollowEvent event = eventClass.getDeclaredConstructor().newInstance(); FileFollowEvent event = eventClass.getDeclaredConstructor().newInstance();
event.setKey(base.toString()); event.setType(base.toString());
event.setPath(path); event.setPath(path);
event.setContent(line); event.setContent(line);
eventBus.post(event); eventBus.post(event);

View file

@ -1,6 +1,6 @@
package org.xbib.event.path; package org.xbib.event.path;
import org.xbib.event.DefaultEvent; import org.xbib.event.common.EventImpl;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
@ -9,7 +9,7 @@ import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.FileTime; import java.nio.file.attribute.FileTime;
import java.time.Instant; import java.time.Instant;
public class DefaultPathEvent extends DefaultEvent implements PathEvent { public class PathEventImpl extends EventImpl implements PathEvent {
private Path path; private Path path;

View file

@ -58,7 +58,7 @@ public class PathEventManagerService implements EventManagerService, Closeable {
if (definition.getAsBoolean("enabled", true)) { if (definition.getAsBoolean("enabled", true)) {
int maxBytes = definition.getAsInt("maxfilesize", 10 * 1024 * 1024); // 10 MB int maxBytes = definition.getAsInt("maxfilesize", 10 * 1024 * 1024); // 10 MB
TimeValue lifetime = definition.getAsTime("lifetime", TimeValue.timeValueHours(72)); TimeValue lifetime = definition.getAsTime("lifetime", TimeValue.timeValueHours(72));
String className = definition.get("class", DefaultPathEvent.class.getName()); String className = definition.get("class", PathEventImpl.class.getName());
Class<? extends PathEvent> eventClass = (Class<? extends PathEvent>) classLoader.loadClass(className); Class<? extends PathEvent> eventClass = (Class<? extends PathEvent>) classLoader.loadClass(className);
Path p = path.resolve(name); Path p = path.resolve(name);
createPathEventService(name, p, maxBytes, lifetime, eventClass); createPathEventService(name, p, maxBytes, lifetime, eventClass);

View file

@ -215,7 +215,7 @@ public class PathEventService implements Callable<Integer>, Closeable {
private PathEvent toEvent(String base, Path file, String suffix, String json) { private PathEvent toEvent(String base, Path file, String suffix, String json) {
try { try {
PathEvent event = pathEventClass.getConstructor().newInstance(); PathEvent event = pathEventClass.getConstructor().newInstance();
event.setKey(base); event.setType(base);
event.setFile(file); event.setFile(file);
event.setSuffix(suffix); event.setSuffix(suffix);
event.setPath(path); // remember directory for fail() and success() event.setPath(path); // remember directory for fail() and success()

View file

@ -1,14 +1,14 @@
package org.xbib.event.timer; package org.xbib.event.timer;
import org.xbib.event.DefaultEvent; import org.xbib.event.common.EventImpl;
import java.time.Instant; import java.time.Instant;
public class DefaultTimerEvent extends DefaultEvent implements TimerEvent { public class TimerEventImpl extends EventImpl implements TimerEvent {
private Instant instant; private Instant instant;
public DefaultTimerEvent() { public TimerEventImpl() {
} }
@Override @Override

View file

@ -3,7 +3,7 @@ package org.xbib.event;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.xbib.event.bus.Subscribe; import org.xbib.event.bus.Subscribe;
import org.xbib.event.common.EventManager; import org.xbib.event.common.EventManager;
import org.xbib.event.generic.DefaultGenericEvent; import org.xbib.event.generic.GenericEventImpl;
import org.xbib.event.generic.GenericEvent; import org.xbib.event.generic.GenericEvent;
import org.xbib.settings.Settings; import org.xbib.settings.Settings;
@ -24,7 +24,7 @@ public class EventManagerTest {
EventManager eventManager = EventManager.builder(settings) EventManager eventManager = EventManager.builder(settings)
.register(consumer) .register(consumer)
.build(); .build();
eventManager.getGenericEventManagerService().post(new DefaultGenericEvent(e -> { eventManager.getGenericEventManagerService().post(new GenericEventImpl(e -> {
logger.log(Level.INFO, "received event " + e); logger.log(Level.INFO, "received event " + e);
})); }));
} }
@ -39,7 +39,7 @@ public class EventManagerTest {
.register(consumer) .register(consumer)
.build(); .build();
CompletableFuture<GenericEvent> future = new CompletableFuture<>(); CompletableFuture<GenericEvent> future = new CompletableFuture<>();
eventManager.getGenericEventManagerService().post(new DefaultGenericEvent(e -> { eventManager.getGenericEventManagerService().post(new GenericEventImpl(e -> {
logger.log(Level.INFO, "received event " + e); logger.log(Level.INFO, "received event " + e);
future.complete(e); future.complete(e);
})); }));
@ -59,7 +59,7 @@ public class EventManagerTest {
.loadEventConsumers() .loadEventConsumers()
.build(); .build();
CompletableFuture<GenericEvent> future = new CompletableFuture<>(); CompletableFuture<GenericEvent> future = new CompletableFuture<>();
eventManager.getGenericEventManagerService().post(new DefaultGenericEvent(e -> { eventManager.getGenericEventManagerService().post(new GenericEventImpl(e -> {
logger.log(Level.INFO, "received event " + e); logger.log(Level.INFO, "received event " + e);
}), future); }), future);
GenericEvent e = future.get(); GenericEvent e = future.get();
@ -72,7 +72,7 @@ public class EventManagerTest {
} }
@Subscribe @Subscribe
public void onEvent(DefaultGenericEvent event) { public void onEvent(GenericEventImpl event) {
event.received(); event.received();
} }
} }

View file

@ -1,4 +1,4 @@
package org.xbib.event.clock; package org.xbib.event.clock;
public class TestClockEvent extends DefaultClockEvent { public class TestClockEvent extends ClockEventImpl {
} }

View file

@ -1,4 +1,4 @@
package org.xbib.event.path; package org.xbib.event.path;
public class TestFileFollowEvent extends DefaultFileFollowEvent { public class TestFileFollowEvent extends FileFollowEventImpl {
} }

View file

@ -1,4 +1,4 @@
package org.xbib.event.timer; package org.xbib.event.timer;
public class TestTimerEvent extends DefaultTimerEvent { public class TestTimerEvent extends TimerEventImpl {
} }

View file

@ -1,3 +1,6 @@
dependencies { dependencies {
api project(':event-common')
api libs.net.http.server.netty
api libs.net.http.client.netty
implementation libs.datastructures.json.tiny
} }

View file

@ -0,0 +1,13 @@
module org.xbib.event.net.http {
exports org.xbib.event.net.http;
requires org.xbib.event.api;
requires org.xbib.event.common;
requires org.xbib.net.http;
requires org.xbib.net.http.client;
requires org.xbib.net.http.client.netty;
requires org.xbib.net.http.client.netty.secure;
requires org.xbib.net.http.server;
requires org.xbib.net.http.server.netty;
requires org.xbib.net.http.server.netty.secure;
requires java.logging;
}

View file

@ -0,0 +1,44 @@
package org.xbib.event.net.http;
import org.xbib.event.common.EventManager;
import org.xbib.event.common.EventImpl;
import org.xbib.net.http.HttpHeaderNames;
import org.xbib.net.http.HttpHeaderValues;
import org.xbib.net.http.HttpMethod;
import org.xbib.net.http.server.service.BaseHttpService;
import org.xbib.net.http.server.service.HttpService;
import java.nio.charset.StandardCharsets;
import static org.xbib.net.http.HttpResponseStatus.NOT_FOUND;
import static org.xbib.net.http.HttpResponseStatus.OK;
public class EventReceiverService {
private final EventManager eventManager;
public EventReceiverService(EventManager eventManager) {
this.eventManager = eventManager;
}
public HttpService createService(String prefix) {
return BaseHttpService.builder()
.setPrefix(prefix)
.setPath("/event/{type}")
.setMethod(HttpMethod.POST)
.setHandler(ctx -> {
EventImpl event = EventImpl.fromJson(ctx.getRequest().asJson());
if (event.isNullEvent()) {
ctx.status(NOT_FOUND).done();
} else {
eventManager.submit(event);
ctx.status(OK)
.header(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON)
.charset(StandardCharsets.UTF_8)
.body(event.toJson())
.done();
}
})
.build();
}
}

View file

@ -138,12 +138,12 @@ public class DefaultSyslogMessage implements SyslogMessage {
} }
@Override @Override
public void setKey(String key) { public void setType(String key) {
// ignore // ignore
} }
@Override @Override
public String getKey() { public String getType() {
return builder.messageId; return builder.messageId;
} }

View file

@ -72,7 +72,6 @@ public class MessageEncoder extends MessageToMessageEncoder<Message> {
buffer.writeCharSequence(kvp.getValue().toString(), charset); buffer.writeCharSequence(kvp.getValue().toString(), charset);
index++; index++;
} }
output.add(buffer); output.add(buffer);
} }

View file

@ -18,6 +18,7 @@ dependencyResolutionManagement {
version('gradle', '8.5') version('gradle', '8.5')
version('datastructures', '5.0.6') version('datastructures', '5.0.6')
version('net', '4.0.4') version('net', '4.0.4')
version('net-http', '4.1.0')
version('netty', '4.1.104.Final') version('netty', '4.1.104.Final')
library('netty-handler', 'io.netty', 'netty-handler').versionRef('netty') library('netty-handler', 'io.netty', 'netty-handler').versionRef('netty')
library('net', 'org.xbib', 'net').versionRef('net') library('net', 'org.xbib', 'net').versionRef('net')
@ -26,6 +27,8 @@ dependencyResolutionManagement {
library('settings-api', 'org.xbib', 'settings-api').versionRef('datastructures') library('settings-api', 'org.xbib', 'settings-api').versionRef('datastructures')
library('settings-datastructures-json', 'org.xbib', 'settings-datastructures-json').versionRef('datastructures') library('settings-datastructures-json', 'org.xbib', 'settings-datastructures-json').versionRef('datastructures')
library('time', 'org.xbib', 'time').version('4.0.0') library('time', 'org.xbib', 'time').version('4.0.0')
library('net-http-server-netty', 'org.xbib', 'net-http-server-netty-secure').versionRef('net-http')
library('net-http-client-netty', 'org.xbib', 'net-http-client-netty-secure').versionRef('net-http')
library('reactivestreams', 'org.reactivestreams', 'reactive-streams').version('1.0.3') library('reactivestreams', 'org.reactivestreams', 'reactive-streams').version('1.0.3')
library('rxjava3', 'io.reactivex.rxjava3', 'rxjava').version('3.0.3') library('rxjava3', 'io.reactivex.rxjava3', 'rxjava').version('3.0.3')
} }