refactoring of events and event building

This commit is contained in:
Jörg Prante 2024-01-22 18:50:23 +01:00
parent 420611ed9d
commit c8141c3755
51 changed files with 589 additions and 579 deletions

View file

@ -0,0 +1,4 @@
package org.xbib.event;
public interface ClockEvent extends Event {
}

View file

@ -1,14 +1,44 @@
package org.xbib.event; package org.xbib.event;
import java.util.Map; import java.io.IOException;
import java.nio.file.Path;
import java.time.Instant;
public interface Event { public interface Event {
void setType(String type); boolean isNullEvent();
Listener getListener();
String getType(); String getType();
void setMap(Map<String, Object> map); String getCode();
Map<String, Object> getMap(); String getMessage();
Instant getCreated();
Instant getScheduledFor();
Payload getPayload();
Path getPath();
String getBase();
String getSuffix();
long getFileSize();
void success() throws IOException;
void fail() throws IOException;
String INCOMING = "incoming";
String SUCCESS = "success";
String FAIL = "fail";
String toJson() throws IOException;
} }

View file

@ -0,0 +1,4 @@
package org.xbib.event;
public interface FileFollowEvent extends Event {
}

View file

@ -0,0 +1,7 @@
package org.xbib.event;
@FunctionalInterface
public interface Listener {
void listen(Event event);
}

View file

@ -0,0 +1,4 @@
package org.xbib.event;
public interface PathEvent extends Event {
}

View file

@ -0,0 +1,15 @@
package org.xbib.event;
import java.util.LinkedHashMap;
import java.util.Map;
public class Payload extends LinkedHashMap<String, Object> {
public Payload() {
super();
}
public Payload(Map<String, Object> map) {
super(map);
}
}

View file

@ -0,0 +1,4 @@
package org.xbib.event;
public interface TimerEvent extends Event {
}

View file

@ -1,12 +0,0 @@
package org.xbib.event.clock;
import org.xbib.event.Event;
import java.time.Instant;
public interface ClockEvent extends Event {
void setInstant(Instant instant);
Instant getInstant();
}

View file

@ -1,23 +0,0 @@
package org.xbib.event.clock;
import org.xbib.event.common.EventImpl;
import java.time.Instant;
public class ClockEventImpl extends EventImpl implements ClockEvent {
private Instant instant;
public ClockEventImpl() {
}
@Override
public void setInstant(Instant instant) {
this.instant = instant;
}
@Override
public Instant getInstant() {
return instant;
}
}

View file

@ -31,7 +31,6 @@ public class ClockEventManagerService implements EventManagerService {
public ClockEventManagerService init(EventManager eventManager) { public ClockEventManagerService init(EventManager eventManager) {
Settings settings = eventManager.getSettings(); Settings settings = eventManager.getSettings();
EventBus eventBus = eventManager.getEventBus(); EventBus eventBus = eventManager.getEventBus();
ClassLoader classLoader = eventManager.getClassLoader();
this.suspended = new ArrayList<>(); this.suspended = new ArrayList<>();
ThreadFactory threadFactory = new ThreadFactory() { ThreadFactory threadFactory = new ThreadFactory() {
int n = 1; int n = 1;
@ -43,24 +42,18 @@ public class ClockEventManagerService implements EventManagerService {
ScheduledExecutorService scheduledExecutorService = ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(settings.getAsInt("pool.size", 2), threadFactory); Executors.newScheduledThreadPool(settings.getAsInt("pool.size", 2), threadFactory);
this.cronSchedule = new CronSchedule<>(scheduledExecutorService); this.cronSchedule = new CronSchedule<>(scheduledExecutorService);
for (Map.Entry<String, Settings> clockEventService : settings.getGroups("event.clock").entrySet()) { for (Map.Entry<String, Settings> mapEntry : settings.getGroups("event.clock").entrySet()) {
String name = clockEventService.getKey(); String name = mapEntry.getKey();
Settings entrySettings = clockEventService.getValue(); Settings entrySettings = mapEntry.getValue();
if (entrySettings.getAsBoolean("enabled", true)) { if (entrySettings.getAsBoolean("enabled", true)) {
String entry = entrySettings.get("entry"); String entry = entrySettings.get("entry");
if (entry != null) { if (entry != null) {
try { try {
String className = entrySettings.get("class"); ClockEventService clockEventService = new ClockEventService(this, eventBus, name);
if (className != null) { cronSchedule.add(name, CronExpression.parse(entry), clockEventService);
@SuppressWarnings("unchecked") logger.log(Level.INFO, "cron job " + name + " scheduled on " + entry);
Class<? extends ClockEvent> eventClass = (Class<? extends ClockEvent>) classLoader.loadClass(className);
cronSchedule.add(className, CronExpression.parse(entry), new ClockEventService(this, eventBus, name, eventClass));
logger.log(Level.INFO, "cron job " + clockEventService.getKey() + " scheduled on " + entry + ", event class " + className);
} else {
logger.log(Level.WARNING, "no class specified");
}
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.WARNING, "unable to schedule cron job " + clockEventService.getKey() + ", reason " + e.getMessage()); logger.log(Level.WARNING, "unable to schedule cron job " + mapEntry.getKey() + ", reason " + e.getMessage());
} }
} }
} else { } else {

View file

@ -1,10 +1,12 @@
package org.xbib.event.clock; package org.xbib.event.clock;
import java.time.Instant;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import org.xbib.event.Event;
import org.xbib.event.bus.EventBus; import org.xbib.event.bus.EventBus;
import org.xbib.event.common.EventImpl;
public class ClockEventService implements Callable<Integer> { public class ClockEventService implements Callable<Integer> {
@ -16,16 +18,12 @@ public class ClockEventService implements Callable<Integer> {
private final String name; private final String name;
private final Class<? extends ClockEvent> eventClass;
public ClockEventService(ClockEventManagerService manager, public ClockEventService(ClockEventManagerService manager,
EventBus eventBus, EventBus eventBus,
String name, String name) {
Class<? extends ClockEvent> eventClass) {
this.manager = manager; this.manager = manager;
this.eventBus = eventBus; this.eventBus = eventBus;
this.name = name; this.name = name;
this.eventClass = eventClass;
} }
@Override @Override
@ -35,9 +33,9 @@ public class ClockEventService implements Callable<Integer> {
logger.log(Level.FINE, "clock event " + name + " suspended"); logger.log(Level.FINE, "clock event " + name + " suspended");
return 1; return 1;
} else { } else {
logger.log(Level.FINE, "posting clock event " + eventClass.getName()); Event clockEvent = EventImpl.builder()
ClockEvent clockEvent = eventClass.getDeclaredConstructor().newInstance(); .setType("clock")
clockEvent.setInstant(Instant.now()); .build();
eventBus.post(clockEvent); eventBus.post(clockEvent);
return 0; return 0;
} }

View file

@ -1,8 +1,8 @@
package org.xbib.event.clock; package org.xbib.event.clock;
import org.xbib.event.ClockEvent;
import org.xbib.event.EventConsumer; import org.xbib.event.EventConsumer;
import java.io.IOException;
import java.util.logging.Logger; import java.util.logging.Logger;
import org.xbib.event.bus.AllowConcurrentEvents; import org.xbib.event.bus.AllowConcurrentEvents;
import org.xbib.event.bus.Subscribe; import org.xbib.event.bus.Subscribe;
@ -17,7 +17,7 @@ public class SimpleClockEventConsumer implements EventConsumer {
@Subscribe @Subscribe
@AllowConcurrentEvents @AllowConcurrentEvents
void onEvent(ClockEvent event) { void onEvent(ClockEvent event) {
logger.info("received demo clock event, instant = " + event.getInstant()); logger.info("received demo clock event, created = " + event.getCreated());
} }
} }

View file

@ -0,0 +1,10 @@
package org.xbib.event.common;
import org.xbib.event.ClockEvent;
public class ClockEventImpl extends EventImpl implements ClockEvent {
public ClockEventImpl(EventBuilder builder) {
super(builder);
}
}

View file

@ -0,0 +1,111 @@
package org.xbib.event.common;
import org.xbib.event.Event;
import org.xbib.event.Listener;
import org.xbib.event.Payload;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
public class EventBuilder {
Listener listener;
String type;
String code;
String message;
Instant scheduled;
Instant created;
Payload payload;
Path path;
String base;
String suffix;
long fileSize;
long maxFileSize;
EventBuilder() {
this.maxFileSize = -1L;
}
public EventBuilder setListener(Listener listener) {
this.listener = listener;
return this;
}
public EventBuilder setType(String type) {
this.type = type;
return this;
}
public EventBuilder setCode(String code) {
this.code = code;
return this;
}
public EventBuilder setMessage(String message) {
this.message = message;
return this;
}
public EventBuilder setScheduledFor(Instant scheduled) {
this.scheduled = scheduled;
return this;
}
public EventBuilder setPayload(Payload payload) {
this.payload = payload;
return this;
}
public EventBuilder setMaxFileSize(long maxFileSize) {
this.maxFileSize = maxFileSize;
return this;
}
public EventBuilder setPath(Path path) throws IOException {
this.path = path;
base = getBase(path);
suffix = getSuffix(path);
fileSize = Files.size(path);
if (maxFileSize != -1L && fileSize > maxFileSize) {
throw new IOException("file size too large");
}
return this;
}
public Event build() {
this.created = Instant.now();
return switch (type) {
case "clock" -> new ClockEventImpl(this);
case "timer" -> new TimerEventImpl(this);
case "path" -> new PathEventImpl(this);
case "filefollow" -> new FileFollowEventImpl(this);
case "generic" -> new GenericEventImpl(this, listener);
default -> new EventImpl(this);
};
}
private static String getBase(Path path) {
String name = path.getFileName().toString();
int pos = name.lastIndexOf('.');
return pos >= 0 ? name.substring(0, pos) : name;
}
private static String getSuffix(Path path) {
String name = path.getFileName().toString();
int pos = name.lastIndexOf('.');
return pos >= 0 ? name.substring(pos + 1) : null;
}
}

View file

@ -2,52 +2,124 @@ package org.xbib.event.common;
import org.xbib.datastructures.json.tiny.Json; import org.xbib.datastructures.json.tiny.Json;
import org.xbib.event.Event; import org.xbib.event.Event;
import org.xbib.event.Listener;
import org.xbib.event.Payload;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.FileTime;
import java.time.Instant;
import java.util.Map; import java.util.Map;
public class EventImpl implements Event { public class EventImpl implements Event {
private String type; private final EventBuilder builder;
private Map<String, Object> map; EventImpl(EventBuilder builder) {
this.builder = builder;
public EventImpl() {
} }
@Override public static EventBuilder builder() {
public void setType(String type) { return new EventBuilder();
this.type = type; }
public Listener getListener() {
return builder.listener;
} }
@Override @Override
public String getType() { public String getType() {
return type; return builder.type;
} }
@Override @Override
public void setMap(Map<String, Object> map) { public String getCode() {
this.map = map; return builder.code;
} }
@Override @Override
public Map<String, Object> getMap() { public String getMessage() {
return map; return builder.message;
} }
public static EventImpl fromJson(String json) { @Override
public Payload getPayload() {
return builder.payload;
}
@Override
public Instant getCreated() {
return builder.created;
}
@Override
public Instant getScheduledFor() {
return builder.scheduled;
}
@Override
public Path getPath() {
return builder.path;
}
@Override
public String getBase() {
return builder.base;
}
@Override
public String getSuffix() {
return builder.suffix;
}
@Override
public long getFileSize() {
return builder.fileSize;
}
public static Event fromFile(Path file) throws IOException {
return fromJson(Files.readString(file));
}
public static Event fromJson(String json) {
Map<String, Object> map = Json.toMap(json); Map<String, Object> map = Json.toMap(json);
EventImpl event = new EventImpl(); return builder()
event.setType((String) map.get("type")); .setType(map.getOrDefault("type", "generic").toString())
event.setMap(map); .setPayload(new Payload(map))
return event; .build();
} }
public String toJson() throws IOException { public String toJson() throws IOException {
return Json.toString(map); return Json.toString(builder.payload);
} }
@Override
public boolean isNullEvent() { public boolean isNullEvent() {
return type == null; return builder.type == null;
}
@Override
public String toString() {
return "Event[path = " + builder.path + ", base = " + builder.base + ", suffix = " + builder.suffix + " payload = " + builder.payload +"]";
}
public void success() throws IOException {
if (builder.path != null) {
Files.setLastModifiedTime(builder.path, FileTime.from(Instant.now()));
Files.move(builder.path, builder.path.getParent().resolve(Event.SUCCESS)
.resolve(builder.path.getFileName()).toAbsolutePath(),
StandardCopyOption.REPLACE_EXISTING);
}
}
public void fail() throws IOException {
Files.setLastModifiedTime(builder.path, FileTime.from(Instant.now()));
Files.move(builder.path, builder.path.getParent().resolve(Event.FAIL)
.resolve(builder.path.getFileName()).toAbsolutePath(),
StandardCopyOption.REPLACE_EXISTING);
} }
} }

View file

@ -73,8 +73,9 @@ public final class EventManager {
getGenericEventManagerService().post(event); getGenericEventManagerService().post(event);
} }
public <T> EventManagerService getEventManagerService(Class<T> cl) { @SuppressWarnings("unchecked")
return eventManagerServices.get(cl); public <T extends EventManagerService> T getEventManagerService(Class<T> cl) {
return (T) eventManagerServices.get(cl);
} }
public GenericEventManagerService getGenericEventManagerService() { public GenericEventManagerService getGenericEventManagerService() {

View file

@ -0,0 +1,10 @@
package org.xbib.event.common;
import org.xbib.event.FileFollowEvent;
public class FileFollowEventImpl extends EventImpl implements FileFollowEvent {
public FileFollowEventImpl(EventBuilder builder) {
super(builder);
}
}

View file

@ -0,0 +1,29 @@
package org.xbib.event.common;
import org.xbib.event.Listener;
import java.util.Objects;
public class GenericEventImpl extends EventImpl {
private final EventBuilder builder;
public GenericEventImpl(EventBuilder builder) {
this(builder, null);
}
public GenericEventImpl(EventBuilder builder, Listener listener) {
super(builder);
this.builder = builder;
this.builder.listener = Objects.requireNonNull(listener);
}
public GenericEventImpl setListener(Listener listener) {
this.builder.listener = Objects.requireNonNull(listener);
return this;
}
public void received() {
builder.listener.listen(this);
}
}

View file

@ -0,0 +1,10 @@
package org.xbib.event.common;
import org.xbib.event.PathEvent;
public class PathEventImpl extends EventImpl implements PathEvent {
public PathEventImpl(EventBuilder builder) {
super(builder);
}
}

View file

@ -0,0 +1,10 @@
package org.xbib.event.common;
import org.xbib.event.TimerEvent;
public class TimerEventImpl extends EventImpl implements TimerEvent {
public TimerEventImpl(EventBuilder builder) {
super(builder);
}
}

View file

@ -1,6 +0,0 @@
package org.xbib.event.generic;
import org.xbib.event.Event;
public interface GenericEvent extends Event {
}

View file

@ -1,31 +0,0 @@
package org.xbib.event.generic;
import org.xbib.event.common.EventImpl;
public class GenericEventImpl extends EventImpl implements GenericEvent {
private Listener listener;
public GenericEventImpl() {
this(null);
}
public GenericEventImpl(Listener listener) {
this.listener = listener;
}
public GenericEventImpl setListener(Listener listener) {
this.listener = listener;
return this;
}
public Listener getListener() {
return listener;
}
public void received() {
if (listener != null) {
listener.listen(this);
}
}
}

View file

@ -3,15 +3,22 @@ package org.xbib.event.generic;
import java.io.IOException; import java.io.IOException;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.xbib.event.Event;
import org.xbib.event.Listener;
import org.xbib.event.bus.EventBus; import org.xbib.event.bus.EventBus;
import org.xbib.event.bus.Subscriber; import org.xbib.event.bus.Subscriber;
import org.xbib.event.bus.SubscriberRegistry; import org.xbib.event.bus.SubscriberRegistry;
import org.xbib.event.common.EventManager; import org.xbib.event.common.EventManager;
import org.xbib.event.common.EventManagerService; import org.xbib.event.common.EventManagerService;
import org.xbib.event.common.GenericEventImpl;
public class GenericEventManagerService implements EventManagerService { public class GenericEventManagerService implements EventManagerService {
private static final Logger logger = Logger.getLogger(GenericEventManagerService.class.getName());
private EventBus eventBus; private EventBus eventBus;
public GenericEventManagerService() { public GenericEventManagerService() {
@ -33,9 +40,10 @@ public class GenericEventManagerService implements EventManagerService {
} }
public void post(GenericEventImpl event, public void post(GenericEventImpl event,
CompletableFuture<GenericEvent> future) { CompletableFuture<Event> future) {
SubscriberRegistry subscriberRegistry = eventBus.getSubscribers(); SubscriberRegistry subscriberRegistry = eventBus.getSubscribers();
Set<Subscriber> set = subscriberRegistry.getSubscribersForTesting(event.getClass()); Set<Subscriber> set = subscriberRegistry.getSubscribersForTesting(event.getClass());
logger.log(Level.INFO, "set = " + set);
event.setListener(new WrappedListener(event.getListener(), set.size(), future)); event.setListener(new WrappedListener(event.getListener(), set.size(), future));
post(event); post(event);
} }
@ -46,18 +54,20 @@ public class GenericEventManagerService implements EventManagerService {
private int size; private int size;
private final CompletableFuture<GenericEvent> future; private final CompletableFuture<Event> future;
public WrappedListener(Listener listener, int size, CompletableFuture<GenericEvent> future) { public WrappedListener(Listener listener, int size, CompletableFuture<Event> future) {
this.listener = listener; this.listener = listener;
this.size = size; this.size = size;
this.future = future; this.future = future;
} }
@Override @Override
public void listen(GenericEvent event) { public void listen(Event event) {
if (listener != null) { if (listener != null) {
listener.listen(event); listener.listen(event);
} else {
logger.log(Level.WARNING, "listener not set");
} }
if (--size == 0) { if (--size == 0) {
future.complete(event); future.complete(event);

View file

@ -1,7 +0,0 @@
package org.xbib.event.generic;
@FunctionalInterface
public interface Listener {
void listen(GenericEvent event);
}

View file

@ -1,16 +0,0 @@
package org.xbib.event.path;
import org.xbib.event.Event;
import java.nio.file.Path;
public interface FileFollowEvent extends Event {
void setPath(Path path);
Path getPath();
void setContent(String content);
String getContent();
}

View file

@ -1,35 +0,0 @@
package org.xbib.event.path;
import org.xbib.event.common.EventImpl;
import java.nio.file.Path;
public class FileFollowEventImpl extends EventImpl implements FileFollowEvent {
private Path path;
private String content;
public FileFollowEventImpl() {
}
@Override
public void setPath(Path path) {
this.path = path;
}
@Override
public Path getPath() {
return path;
}
@Override
public void setContent(String content) {
this.content = content;
}
@Override
public String getContent() {
return content;
}
}

View file

@ -5,7 +5,6 @@ import org.xbib.event.common.EventManager;
import org.xbib.event.common.EventManagerService; import org.xbib.event.common.EventManagerService;
import org.xbib.settings.Settings; import org.xbib.settings.Settings;
import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Path; import java.nio.file.Path;
import java.nio.file.Paths; import java.nio.file.Paths;
@ -27,12 +26,10 @@ public class FileFollowEventManagerService implements EventManagerService {
public FileFollowEventManagerService() { public FileFollowEventManagerService() {
} }
@SuppressWarnings("unchecked")
@Override @Override
public FileFollowEventManagerService init(EventManager eventManager) { public FileFollowEventManagerService init(EventManager eventManager) {
Settings settings = eventManager.getSettings(); Settings settings = eventManager.getSettings();
EventBus eventBus = eventManager.getEventBus(); EventBus eventBus = eventManager.getEventBus();
ClassLoader classLoader = eventManager.getClassLoader();
ExecutorService executorService = eventManager.getExecutorService(); ExecutorService executorService = eventManager.getExecutorService();
this.eventServiceMap = new LinkedHashMap<>(); this.eventServiceMap = new LinkedHashMap<>();
for (Map.Entry<String, Settings> entry : settings.getGroups("event.filefollow").entrySet()) { for (Map.Entry<String, Settings> entry : settings.getGroups("event.filefollow").entrySet()) {
@ -45,12 +42,10 @@ public class FileFollowEventManagerService implements EventManagerService {
try { try {
Path base = Paths.get(baseStr); Path base = Paths.get(baseStr);
Pattern pattern = Pattern.compile(patternStr); Pattern pattern = Pattern.compile(patternStr);
String className = definition.get("class", FileFollowEventImpl.class.getName()); FileFollowEventService fileFollowEventService = new FileFollowEventService(definition, eventBus, base, pattern);
Class<? extends FileFollowEvent> eventClass = (Class<? extends FileFollowEvent>) classLoader.loadClass(className);
FileFollowEventService fileFollowEventService = new FileFollowEventService(definition, eventBus, base, pattern, eventClass);
Future<?> future = executorService.submit(fileFollowEventService); Future<?> future = executorService.submit(fileFollowEventService);
eventServiceMap.put(future, fileFollowEventService); eventServiceMap.put(future, fileFollowEventService);
logger.log(Level.INFO, "file follow service " + entry.getKey() + " with base " + base + " and pattern " + pattern + " added, event class " + className); logger.log(Level.INFO, "file follow service " + entry.getKey() + " with base " + base + " and pattern " + pattern + " added");
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.SEVERE, "unable to create file follow service " + entry.getKey() + ", reason " + e.getMessage(), e); logger.log(Level.SEVERE, "unable to create file follow service " + entry.getKey() + ", reason " + e.getMessage(), e);
} }

View file

@ -1,6 +1,8 @@
package org.xbib.event.path; package org.xbib.event.path;
import org.xbib.event.Event;
import org.xbib.event.bus.EventBus; import org.xbib.event.bus.EventBus;
import org.xbib.event.common.EventImpl;
import org.xbib.settings.Settings; import org.xbib.settings.Settings;
import java.io.Closeable; import java.io.Closeable;
@ -37,8 +39,6 @@ public class FileFollowEventService implements Callable<Integer>, Closeable {
private final Pattern pattern; private final Pattern pattern;
private final Class<? extends FileFollowEvent> eventClass;
private final WatchService watchService; private final WatchService watchService;
private final Map<Path, Long> fileSizes; private final Map<Path, Long> fileSizes;
@ -48,12 +48,10 @@ public class FileFollowEventService implements Callable<Integer>, Closeable {
public FileFollowEventService(Settings settings, public FileFollowEventService(Settings settings,
EventBus eventBus, EventBus eventBus,
Path base, Path base,
Pattern pattern, Pattern pattern) throws IOException {
Class<? extends FileFollowEvent> eventClass) throws IOException {
this.eventBus = eventBus; this.eventBus = eventBus;
this.base = base; this.base = base;
this.pattern = pattern; this.pattern = pattern;
this.eventClass = eventClass;
FileSystem fileSystem = base.getFileSystem(); FileSystem fileSystem = base.getFileSystem();
this.watchService = fileSystem.newWatchService(); this.watchService = fileSystem.newWatchService();
WatchEvent.Kind<?>[] kinds = new WatchEvent.Kind<?>[1]; WatchEvent.Kind<?>[] kinds = new WatchEvent.Kind<?>[1];
@ -91,10 +89,12 @@ public class FileFollowEventService implements Callable<Integer>, Closeable {
String content = readRange(channel, lastSize, currentSize); String content = readRange(channel, lastSize, currentSize);
// split content by line, this allows pattern matching without preprocessing in worker // split content by line, this allows pattern matching without preprocessing in worker
for (String line : content.split("\n")) { for (String line : content.split("\n")) {
FileFollowEvent event = eventClass.getDeclaredConstructor().newInstance(); Event event = EventImpl.builder()
event.setType(base.toString()); .setType("filefollow")
event.setPath(path); .setCode(base.toString())
event.setContent(line); .setPath(path)
.setMessage(line)
.build();
eventBus.post(event); eventBus.post(event);
} }
} }

View file

@ -1,26 +0,0 @@
package org.xbib.event.path;
import org.xbib.event.Event;
import java.io.IOException;
import java.nio.file.Path;
public interface PathEvent extends Event {
void setPath(Path path);
Path getPath();
void setFile(Path file);
Path getFile();
void setSuffix(String suffix);
String getSuffix();
void success() throws IOException;
void fail() throws IOException;
}

View file

@ -1,66 +0,0 @@
package org.xbib.event.path;
import org.xbib.event.common.EventImpl;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.FileTime;
import java.time.Instant;
public class PathEventImpl extends EventImpl implements PathEvent {
private Path path;
private Path file;
private String suffix;
@Override
public void setPath(Path path) {
this.path = path;
}
@Override
public Path getPath() {
return path;
}
@Override
public void setFile(Path file) {
this.file = file;
}
@Override
public Path getFile() {
return file;
}
@Override
public void setSuffix(String suffix) {
this.suffix = suffix;
}
@Override
public String getSuffix() {
return suffix;
}
public void success() throws IOException {
Files.setLastModifiedTime(file, FileTime.from(Instant.now()));
Files.move(file, path.resolve(PathEventService.SUCCESS).resolve(file.getFileName()).toAbsolutePath(),
StandardCopyOption.REPLACE_EXISTING);
}
public void fail() throws IOException {
Files.setLastModifiedTime(file, FileTime.from(Instant.now()));
Files.move(file, path.resolve(PathEventService.FAIL).resolve(file.getFileName()).toAbsolutePath(),
StandardCopyOption.REPLACE_EXISTING);
}
@Override
public String toString() {
return "path=" + path + " file=" + file + " suffix=" + suffix + " map=" + getMap();
}
}

View file

@ -2,6 +2,7 @@ package org.xbib.event.path;
import org.xbib.datastructures.api.TimeValue; import org.xbib.datastructures.api.TimeValue;
import org.xbib.datastructures.json.tiny.Json; import org.xbib.datastructures.json.tiny.Json;
import org.xbib.event.Event;
import org.xbib.event.bus.EventBus; import org.xbib.event.bus.EventBus;
import org.xbib.event.common.EventManager; import org.xbib.event.common.EventManager;
import org.xbib.event.common.EventManagerService; import org.xbib.event.common.EventManagerService;
@ -55,12 +56,9 @@ public class PathEventManagerService implements EventManagerService {
String name = entry.getKey(); String name = entry.getKey();
Settings definition = entry.getValue(); Settings definition = entry.getValue();
if (definition.getAsBoolean("enabled", true)) { if (definition.getAsBoolean("enabled", true)) {
int maxBytes = definition.getAsInt("maxfilesize", 10 * 1024 * 1024); // 10 MB
TimeValue lifetime = definition.getAsTime("lifetime", TimeValue.timeValueHours(72)); TimeValue lifetime = definition.getAsTime("lifetime", TimeValue.timeValueHours(72));
String className = definition.get("class", PathEventImpl.class.getName());
Class<? extends PathEvent> eventClass = (Class<? extends PathEvent>) classLoader.loadClass(className);
Path p = path.resolve(name); Path p = path.resolve(name);
createPathEventService(name, p, maxBytes, lifetime, eventClass); createPathEventService(name, p, lifetime);
} else { } else {
logger.log(Level.WARNING, "path servive definition not enabled in configuration"); logger.log(Level.WARNING, "path servive definition not enabled in configuration");
} }
@ -73,12 +71,10 @@ public class PathEventManagerService implements EventManagerService {
public void createPathEventService(String name, public void createPathEventService(String name,
Path path, Path path,
int maxBytes, TimeValue lifetime)
TimeValue lifetime,
Class<? extends PathEvent> eventClass)
throws IOException { throws IOException {
createQueue(name, path); createQueue(name, path);
PathEventService pathEventService = new PathEventService(this, eventBus, name, path, maxBytes, lifetime, eventClass); PathEventService pathEventService = new PathEventService(this, eventBus, name, path, lifetime);
add(pathEventService); add(pathEventService);
} }
@ -128,12 +124,12 @@ public class PathEventManagerService implements EventManagerService {
public boolean put(String queue, String key, String suffix, String string) throws IOException { public boolean put(String queue, String key, String suffix, String string) throws IOException {
String keyFileName = key + suffix; String keyFileName = key + suffix;
Path queuePath = path.resolve(queue); Path queuePath = path.resolve(queue);
if (Files.exists(queuePath.resolve(PathEventService.INCOMING).resolve(keyFileName)) || if (Files.exists(queuePath.resolve(Event.INCOMING).resolve(keyFileName)) ||
Files.exists(queuePath.resolve(PathEventService.SUCCESS).resolve(keyFileName))) { Files.exists(queuePath.resolve(Event.SUCCESS).resolve(keyFileName))) {
logger.log(Level.WARNING, "key " + key + " already exists"); logger.log(Level.WARNING, "key " + key + " already exists");
return false; return false;
} }
Path p = queuePath.resolve(PathEventService.INCOMING).resolve(keyFileName); Path p = queuePath.resolve(Event.INCOMING).resolve(keyFileName);
try (Writer writer = Files.newBufferedWriter(p)) { try (Writer writer = Files.newBufferedWriter(p)) {
writer.write(string); writer.write(string);
} }
@ -153,20 +149,20 @@ public class PathEventManagerService implements EventManagerService {
public boolean exists(String queue, String key, String suffix) { public boolean exists(String queue, String key, String suffix) {
String keyFileName = key + suffix; String keyFileName = key + suffix;
Path queuePath = path.resolve(queue); Path queuePath = path.resolve(queue);
return Files.exists(queuePath.resolve(PathEventService.INCOMING).resolve(keyFileName)) || return Files.exists(queuePath.resolve(Event.INCOMING).resolve(keyFileName)) ||
Files.exists(queuePath.resolve(PathEventService.SUCCESS).resolve(keyFileName)); Files.exists(queuePath.resolve(Event.SUCCESS).resolve(keyFileName));
} }
public long sizeOfIncoming(String queue) throws IOException { public long sizeOfIncoming(String queue) throws IOException {
return sizeOf(path.resolve(queue).resolve(PathEventService.INCOMING)); return sizeOf(path.resolve(queue).resolve(Event.INCOMING));
} }
public long sizeOfSuccess(String queue) throws IOException { public long sizeOfSuccess(String queue) throws IOException {
return sizeOf(path.resolve(queue).resolve(PathEventService.SUCCESS)); return sizeOf(path.resolve(queue).resolve(Event.SUCCESS));
} }
public long sizeOfFail(String queue) throws IOException { public long sizeOfFail(String queue) throws IOException {
return sizeOf(path.resolve(queue).resolve(PathEventService.FAIL)); return sizeOf(path.resolve(queue).resolve(Event.FAIL));
} }
public static long sizeOf(Path path) throws IOException { public static long sizeOf(Path path) throws IOException {
@ -179,7 +175,7 @@ public class PathEventManagerService implements EventManagerService {
if (!Files.exists(p)) { if (!Files.exists(p)) {
logger.log(Level.FINE, "creating queue " + name + " at " + p); logger.log(Level.FINE, "creating queue " + name + " at " + p);
Files.createDirectories(p); Files.createDirectories(p);
for (String s: List.of(PathEventService.INCOMING, PathEventService.SUCCESS, PathEventService.FAIL)) { for (String s: List.of(Event.INCOMING, Event.SUCCESS, Event.FAIL)) {
Path dir = p.resolve(s); Path dir = p.resolve(s);
if (!Files.exists(dir)) { if (!Files.exists(dir)) {
logger.log(Level.FINE, "creating queue " + name + " dir " + dir); logger.log(Level.FINE, "creating queue " + name + " dir " + dir);

View file

@ -1,8 +1,9 @@
package org.xbib.event.path; package org.xbib.event.path;
import org.xbib.datastructures.api.TimeValue; import org.xbib.datastructures.api.TimeValue;
import org.xbib.datastructures.json.tiny.Json; import org.xbib.event.Event;
import org.xbib.event.bus.EventBus; import org.xbib.event.bus.EventBus;
import org.xbib.event.common.EventImpl;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
@ -26,11 +27,6 @@ public class PathEventService implements Callable<Integer>, Closeable {
private static final Logger logger = Logger.getLogger(PathEventService.class.getName()); private static final Logger logger = Logger.getLogger(PathEventService.class.getName());
public static final String INCOMING = "incoming";
public static final String SUCCESS = "success";
public static final String FAIL = "fail";
private final PathEventManagerService pathEventManager; private final PathEventManagerService pathEventManager;
@ -40,11 +36,7 @@ public class PathEventService implements Callable<Integer>, Closeable {
private final String name; private final String name;
private final int maxFileSize;
private final TimeValue lifetime; private final TimeValue lifetime;
;
private final Class<? extends PathEvent> pathEventClass;
private final WatchService watchService; private final WatchService watchService;
@ -56,22 +48,18 @@ public class PathEventService implements Callable<Integer>, Closeable {
EventBus eventBus, EventBus eventBus,
String name, String name,
Path path, Path path,
int maxFileSize, TimeValue lifetime) throws IOException {
TimeValue lifetime,
Class<? extends PathEvent> pathEventClass) throws IOException {
this.pathEventManager = pathEventManager; this.pathEventManager = pathEventManager;
this.eventBus = eventBus; this.eventBus = eventBus;
this.name = name; this.name = name;
this.path = path; this.path = path;
this.maxFileSize = maxFileSize;
this.lifetime = lifetime; this.lifetime = lifetime;
this.pathEventClass = pathEventClass;
drainIncoming(); drainIncoming();
this.watchService = path.getFileSystem().newWatchService(); this.watchService = path.getFileSystem().newWatchService();
WatchEvent.Kind<?>[] kinds = new WatchEvent.Kind[] { StandardWatchEventKinds.ENTRY_CREATE }; WatchEvent.Kind<?>[] kinds = new WatchEvent.Kind[] { StandardWatchEventKinds.ENTRY_CREATE };
WatchKey watchKey = path.resolve(INCOMING).register(watchService, kinds); WatchKey watchKey = path.resolve(Event.INCOMING).register(watchService, kinds);
keepWatching = true; keepWatching = true;
logger.log(Level.INFO, "path event service created for incoming files at " + path + " max file size = " + maxFileSize); logger.log(Level.INFO, "path event service created for incoming files at " + path);
} }
public String getName() { public String getName() {
@ -83,7 +71,7 @@ public class PathEventService implements Callable<Integer>, Closeable {
@Override @Override
public Integer call() { public Integer call() {
try { try {
logger.log(Level.INFO, "watch service running on " + path.resolve(INCOMING)); logger.log(Level.INFO, "watch service running on " + path.resolve(Event.INCOMING));
while (keepWatching && watchService != null) { while (keepWatching && watchService != null) {
WatchKey watchKey = watchService.take(); WatchKey watchKey = watchService.take();
logger.log(Level.FINE, "received a watch key " + watchKey); logger.log(Level.FINE, "received a watch key " + watchKey);
@ -96,12 +84,12 @@ public class PathEventService implements Callable<Integer>, Closeable {
Thread.sleep(1000L); Thread.sleep(1000L);
WatchEvent<Path> pathWatchEvent = (WatchEvent<Path>) watchEvent; WatchEvent<Path> pathWatchEvent = (WatchEvent<Path>) watchEvent;
String watchEventContext = pathWatchEvent.context().toString(); String watchEventContext = pathWatchEvent.context().toString();
Path p = path.resolve(INCOMING).resolve(watchEventContext); Path p = path.resolve(Event.INCOMING).resolve(watchEventContext);
logger.log(Level.FINE, "watch event " + pathWatchEvent + " context = " + watchEventContext + " path = " + p); logger.log(Level.FINE, "watch event " + pathWatchEvent + " context = " + watchEventContext + " path = " + p);
if (pathEventManager.getSuspendedQueues().contains(name)) { if (pathEventManager.getSuspendedQueues().contains(name)) {
failEvent(watchEventContext, p); failEvent(p);
} else { } else {
postEvent(watchEventContext, p); postEvent(p);
} }
} }
watchKey.reset(); watchKey.reset();
@ -127,14 +115,8 @@ public class PathEventService implements Callable<Integer>, Closeable {
} }
public void drainIncoming() throws IOException { public void drainIncoming() throws IOException {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path.resolve(INCOMING))) { try (DirectoryStream<Path> stream = Files.newDirectoryStream(path.resolve(Event.INCOMING))) {
stream.forEach(path -> { stream.forEach(this::postEvent);
if (Files.isRegularFile(path)) {
String key = path.getFileName().toString();
logger.log(Level.INFO, "while draining found key = " + key + " path = " + path);
postEvent(key, path);
}
});
} }
} }
@ -163,67 +145,22 @@ public class PathEventService implements Callable<Integer>, Closeable {
} }
} }
private void postEvent(String key, Path file) { private void postEvent(Path file) {
PathEvent event = toEvent(key, file); try {
if (event != null) { Event event = EventImpl.fromFile(file);
logger.log(Level.FINE, "posting new event = " + event.getClass());
eventBus.post(event); eventBus.post(event);
eventCount++; eventCount++;
} catch (IOException e) {
logger.log(Level.SEVERE, "ignoring event post because of " + e.getMessage());
} }
} }
private void failEvent(String key, Path file) throws IOException { private void failEvent(Path file) {
PathEvent event = toEvent(key, file); try {
if (event != null) { Event event = EventImpl.fromFile(file);
logger.log(Level.WARNING, "queue " + name + " suspended, event short-circuited to fail");
event.fail(); event.fail();
} } catch (IOException e) {
} logger.log(Level.SEVERE, "unable to fail event because of " + e.getMessage());
private static String getBase(String name) {
int pos = name.lastIndexOf('.');
return pos >= 0 ? name.substring(0, pos) : name;
}
private static String getSuffix(String name) {
int pos = name.lastIndexOf('.');
return pos >= 0 ? name.substring(pos + 1) : null;
}
private PathEvent toEvent(String key, Path file) {
try {
String base = getBase(key);
String suffix = getSuffix(key);
long fileSize = Files.size(file);
if (fileSize > maxFileSize) {
logger.log(Level.SEVERE, "event object ignored, too large");
return null;
}
if ("!json".equals(suffix)) {
logger.log(Level.SEVERE, "event object ignored, no json suffix");
return null;
}
String json = Files.readString(file);
return toEvent(base, file, suffix, json);
} catch (Exception e) {
logger.log(Level.SEVERE, e.getMessage(), e);
}
logger.log(Level.SEVERE, "event object could not be created");
return null;
}
private PathEvent toEvent(String base, Path file, String suffix, String json) {
try {
PathEvent event = pathEventClass.getConstructor().newInstance();
event.setType(base);
event.setFile(file);
event.setSuffix(suffix);
event.setPath(path); // remember directory for fail() and success()
event.setMap(Json.toMap(json));
return event;
} catch (Exception e) {
logger.log(Level.SEVERE, e.getMessage(), e);
return null;
} }
} }
} }

View file

@ -1,12 +0,0 @@
package org.xbib.event.timer;
import org.xbib.event.Event;
import java.time.Instant;
public interface TimerEvent extends Event {
void setInstant(Instant instant);
Instant getInstant();
}

View file

@ -1,23 +0,0 @@
package org.xbib.event.timer;
import org.xbib.event.common.EventImpl;
import java.time.Instant;
public class TimerEventImpl extends EventImpl implements TimerEvent {
private Instant instant;
public TimerEventImpl() {
}
@Override
public void setInstant(Instant instant) {
this.instant = instant;
}
@Override
public Instant getInstant() {
return instant;
}
}

View file

@ -1,5 +1,6 @@
package org.xbib.event.timer; package org.xbib.event.timer;
import org.xbib.event.Payload;
import org.xbib.event.bus.EventBus; import org.xbib.event.bus.EventBus;
import org.xbib.event.common.EventManager; import org.xbib.event.common.EventManager;
import org.xbib.event.common.EventManagerService; import org.xbib.event.common.EventManagerService;
@ -38,12 +39,11 @@ public class TimerEventManagerService implements EventManagerService {
for (Map.Entry<String, Settings> entry : settings.getGroups("event.timer").entrySet()) { for (Map.Entry<String, Settings> entry : settings.getGroups("event.timer").entrySet()) {
String name = entry.getKey(); String name = entry.getKey();
Settings timerSettings = entry.getValue(); Settings timerSettings = entry.getValue();
String className = timerSettings.get("class", TimerEvent.class.getName());
try { try {
Class<? extends TimerEvent> eventClass = (Class<? extends TimerEvent>) classLoader.loadClass(className);
PersistenceStore<String, Object> persistenceStore = new FilePersistenceStore(timerSettings, name); PersistenceStore<String, Object> persistenceStore = new FilePersistenceStore(timerSettings, name);
services.put(name, new TimerEventService(eventBus, name, eventClass, ZoneId.systemDefault(), persistenceStore)); TimerEventService timerEventService = new TimerEventService(eventBus, name, ZoneId.systemDefault(), persistenceStore);
logger.log(Level.INFO, "timer " + name + " active for timer event class " + className); services.put(name, timerEventService);
logger.log(Level.INFO, "timer " + name + " active: " + timerEventService);
} catch (Exception e) { } catch (Exception e) {
logger.log(Level.WARNING, "unable to activate timer " + name + ", reason: " + e.getMessage(), e); logger.log(Level.WARNING, "unable to activate timer " + name + ", reason: " + e.getMessage(), e);
} }
@ -53,12 +53,12 @@ public class TimerEventManagerService implements EventManagerService {
public boolean put(String service, public boolean put(String service,
String timeSpec, String timeSpec,
Map<String, Object> map) throws ParseException, IOException { Payload payload) throws ParseException, IOException {
if (services.containsKey(service)) { if (services.containsKey(service)) {
Span span = Chronic.parse(timeSpec); Span span = Chronic.parse(timeSpec);
if (span != null) { if (span != null) {
ZonedDateTime zonedDateTime = span.getBeginCalendar(); ZonedDateTime zonedDateTime = span.getBeginCalendar();
services.get(service).schedule(zonedDateTime.toInstant(), map); services.get(service).schedule(zonedDateTime.toInstant(), payload);
logger.log(Level.INFO, "scheduled to " + zonedDateTime); logger.log(Level.INFO, "scheduled to " + zonedDateTime);
} else { } else {
logger.log(Level.INFO, "timer event key " + service + ": can not understand time spec " + timeSpec); logger.log(Level.INFO, "timer event key " + service + ": can not understand time spec " + timeSpec);
@ -73,9 +73,9 @@ public class TimerEventManagerService implements EventManagerService {
public boolean put(String service, public boolean put(String service,
Instant instant, Instant instant,
Map<String, Object> map) throws IOException { Payload payload) throws IOException {
if (services.containsKey(service)) { if (services.containsKey(service)) {
services.get(service).schedule(instant, map); services.get(service).schedule(instant, payload);
return true; return true;
} else { } else {
logger.log(Level.SEVERE, "unknown timer event key: " + service); logger.log(Level.SEVERE, "unknown timer event key: " + service);

View file

@ -1,18 +1,19 @@
package org.xbib.event.timer; package org.xbib.event.timer;
import org.xbib.event.Event;
import org.xbib.event.Payload;
import org.xbib.event.bus.EventBus; import org.xbib.event.bus.EventBus;
import org.xbib.event.common.EventImpl;
import org.xbib.event.persistence.PersistenceStore; import org.xbib.event.persistence.PersistenceStore;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.time.Instant; import java.time.Instant;
import java.time.ZoneId; import java.time.ZoneId;
import java.time.ZonedDateTime; import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Date; import java.util.Date;
import java.util.LinkedHashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects; import java.util.Objects;
@ -27,8 +28,6 @@ class TimerEventService implements Closeable {
private final EventBus eventBus; private final EventBus eventBus;
private final Class<? extends TimerEvent> eventClass;
private final ZoneId zoneId; private final ZoneId zoneId;
private final PersistenceStore<String, Object> persistenceStore; private final PersistenceStore<String, Object> persistenceStore;
@ -37,11 +36,9 @@ class TimerEventService implements Closeable {
public TimerEventService(EventBus eventBus, public TimerEventService(EventBus eventBus,
String name, String name,
Class<? extends TimerEvent> eventClass,
ZoneId zoneId, ZoneId zoneId,
PersistenceStore<String, Object> persistenceStore) throws IOException { PersistenceStore<String, Object> persistenceStore) throws IOException {
this.eventBus = eventBus; this.eventBus = eventBus;
this.eventClass = eventClass;
this.zoneId = zoneId; this.zoneId = zoneId;
this.persistenceStore = persistenceStore; this.persistenceStore = persistenceStore;
this.timer = new Timer(); this.timer = new Timer();
@ -49,13 +46,12 @@ class TimerEventService implements Closeable {
logger.log(Level.INFO, "timer event service " + name + " loaded and rescheduled, " + tasknum + " timer tasks"); logger.log(Level.INFO, "timer event service " + name + " loaded and rescheduled, " + tasknum + " timer tasks");
} }
void schedule(Instant instant, Map<String, Object> map) throws IOException { void schedule(Instant instant, Payload payload) throws IOException {
ZonedDateTime zonedDateTime = instant.atZone(zoneId); String scheduled = instant.atZone(zoneId).format(DateTimeFormatter.ISO_DATE_TIME);
Map<String, Object> task = new LinkedHashMap<>(map); payload.put("scheduled", scheduled);
task.put("scheduled", zonedDateTime.format(DateTimeFormatter.ISO_DATE_TIME)); TimerEventTask timerEventTask = new TimerEventTask(payload);
TimerEventTask timerEventTask = new TimerEventTask(task);
timer.schedule(timerEventTask, Date.from(instant)); timer.schedule(timerEventTask, Date.from(instant));
logger.log(Level.INFO, "new task " + map + " added, scheduled at " + instant); logger.log(Level.INFO, "new task " + payload + " added, scheduled at " + scheduled);
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@ -66,11 +62,12 @@ class TimerEventService implements Closeable {
persistenceStore.clear(); persistenceStore.clear();
persistenceStore.commit(); persistenceStore.commit();
for (Map<String, Object> task : tasks) { for (Map<String, Object> task : tasks) {
Payload payload = new Payload(task);
ZonedDateTime scheduledDate = ZonedDateTime.parse((String) task.get("scheduled"), DateTimeFormatter.ISO_DATE_TIME); ZonedDateTime scheduledDate = ZonedDateTime.parse((String) task.get("scheduled"), DateTimeFormatter.ISO_DATE_TIME);
if (scheduledDate.isBefore(ZonedDateTime.now())) { if (scheduledDate.isBefore(ZonedDateTime.now())) {
logger.log(Level.WARNING, "scheduled timer task " + task + " date already passed"); logger.log(Level.WARNING, "scheduled timer task " + task + " date already passed");
} else { } else {
schedule(scheduledDate.toInstant(), task); schedule(scheduledDate.toInstant(), payload);
} }
} }
tasks = (List<Map<String, Object>>) persistenceStore.getOrDefault("tasks", new ArrayList<>()); tasks = (List<Map<String, Object>>) persistenceStore.getOrDefault("tasks", new ArrayList<>());
@ -90,45 +87,45 @@ class TimerEventService implements Closeable {
public class TimerEventTask extends TimerTask { public class TimerEventTask extends TimerTask {
private final Map<String, Object> map; private final Payload payload;
public TimerEventTask(Map<String, Object> map) throws IOException { public TimerEventTask(Payload payload) throws IOException {
this.map = map; this.payload = payload;
persistenceStore.insert("tasks", this.map); persistenceStore.insert("tasks", this.payload);
} }
@Override @Override
public void run() { public void run() {
TimerEvent timerEvent;
try { try {
timerEvent = eventClass.getDeclaredConstructor().newInstance(); Event timerEvent = EventImpl.builder()
timerEvent.setInstant(Instant.now()); .setType("timer")
timerEvent.setMap(map); .setPayload(payload)
logger.log(Level.FINE, "posting timer event " + timerEvent.getClass().getName() + " map = " + map); .build();
logger.log(Level.FINE, "posting timer event " + timerEvent.getClass().getName() + " with payload = " + payload);
eventBus.post(timerEvent); eventBus.post(timerEvent);
logger.log(Level.FINE, "persistence before remove: " + persistenceStore.get("tasks")); logger.log(Level.FINE, "persistence before remove: " + persistenceStore.get("tasks"));
if (persistenceStore.remove("tasks", this.map)) { if (persistenceStore.remove("tasks", this.payload)) {
logger.log(Level.FINE, "removal done"); logger.log(Level.FINE, "removal done");
} }
logger.log(Level.FINE, "persistence after remove: " + persistenceStore.get("tasks")); logger.log(Level.FINE, "persistence after remove: " + persistenceStore.get("tasks"));
} catch (InstantiationException | IllegalAccessException | InvocationTargetException | NoSuchMethodException | IOException e) { } catch (IOException e) {
logger.log(Level.SEVERE, e.getMessage(), e); logger.log(Level.SEVERE, e.getMessage(), e);
} }
} }
@Override @Override
public String toString() { public String toString() {
return map.toString(); return payload.toString();
} }
@Override @Override
public int hashCode() { public int hashCode() {
return map.hashCode(); return payload.hashCode();
} }
@Override @Override
public boolean equals(Object object) { public boolean equals(Object object) {
return object instanceof TimerEventTask && Objects.equals(map, ((TimerEventTask) object).map); return object instanceof TimerEventTask && Objects.equals(payload, ((TimerEventTask) object).payload);
} }
} }
} }

View file

@ -1,79 +0,0 @@
package org.xbib.event;
import org.junit.jupiter.api.Test;
import org.xbib.event.bus.Subscribe;
import org.xbib.event.common.EventManager;
import org.xbib.event.generic.GenericEventImpl;
import org.xbib.event.generic.GenericEvent;
import org.xbib.settings.Settings;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
public class EventManagerTest {
private static final Logger logger = Logger.getLogger(EventManagerTest.class.getName());
@Test
void testGenericEvents() {
Settings settings = Settings.settingsBuilder()
.build();
TestEventConsumer consumer = new TestEventConsumer();
EventManager eventManager = EventManager.builder(settings)
.register(consumer)
.build();
eventManager.getGenericEventManagerService().post(new GenericEventImpl(e -> {
logger.log(Level.INFO, "received event " + e);
}));
}
@Test
void testGenericEventWithWaitForSubscriber() throws InterruptedException, ExecutionException {
Settings settings = Settings.settingsBuilder()
.build();
TestEventConsumer consumer = new TestEventConsumer();
EventManager eventManager = EventManager.builder(settings)
.register(consumer)
.build();
CompletableFuture<GenericEvent> future = new CompletableFuture<>();
eventManager.getGenericEventManagerService().post(new GenericEventImpl(e -> {
logger.log(Level.INFO, "received event " + e);
future.complete(e);
}));
GenericEvent e = future.get();
logger.log(Level.INFO, "the event was received with result " + e + ", continuing");
}
@Test
void testGenericEventWithWaitForAllConsumers() throws ExecutionException, InterruptedException {
Settings settings = Settings.settingsBuilder()
.build();
TestEventConsumer consumer1 = new TestEventConsumer();
TestEventConsumer consumer2 = new TestEventConsumer();
EventManager eventManager = EventManager.builder(settings)
.register(consumer1)
.register(consumer2)
.loadEventConsumers()
.build();
CompletableFuture<GenericEvent> future = new CompletableFuture<>();
eventManager.getGenericEventManagerService().post(new GenericEventImpl(e -> {
logger.log(Level.INFO, "received event " + e);
}), future);
GenericEvent e = future.get();
logger.log(Level.INFO, "the event " + e + " was received by all consumers, continuing");
}
private static class TestEventConsumer implements EventConsumer {
TestEventConsumer() {
}
@Subscribe
public void onEvent(GenericEventImpl event) {
event.received();
}
}
}

View file

@ -13,10 +13,10 @@ public class ClockEventManagerTest {
TestClockEventConsumer clockEventConsumer = new TestClockEventConsumer(); TestClockEventConsumer clockEventConsumer = new TestClockEventConsumer();
Settings settings = Settings.settingsBuilder() Settings settings = Settings.settingsBuilder()
.put("event.clock.testclockevent.enabled", "true") .put("event.clock.testclockevent.enabled", "true")
.put("event.clock.testclockevent.class", "org.xbib.event.clock.TestClockEvent")
.put("event.clock.testclockevent.entry", "*/1 6-21 * * *") .put("event.clock.testclockevent.entry", "*/1 6-21 * * *")
.build(); .build();
EventManager eventManager = EventManager.builder(settings) EventManager eventManager = EventManager.builder()
.setSettings(settings)
.register(clockEventConsumer) .register(clockEventConsumer)
.build(); .build();
Thread.sleep(90000L); Thread.sleep(90000L);

View file

@ -1,4 +1,10 @@
package org.xbib.event.clock; package org.xbib.event.clock;
import org.xbib.event.common.ClockEventImpl;
import org.xbib.event.common.EventBuilder;
public class TestClockEvent extends ClockEventImpl { public class TestClockEvent extends ClockEventImpl {
public TestClockEvent(EventBuilder builder) {
super(builder);
}
} }

View file

@ -16,6 +16,6 @@ public class TestClockEventConsumer implements EventConsumer {
@Subscribe @Subscribe
@AllowConcurrentEvents @AllowConcurrentEvents
void onEvent(TestClockEvent event) { void onEvent(TestClockEvent event) {
logger.log(Level.INFO, "received test clock event on " + Instant.now() + " event instant = " + event.getInstant()); logger.log(Level.INFO, "received test clock event on " + Instant.now() + " event instant = " + event.getCreated());
} }
} }

View file

@ -0,0 +1,99 @@
package org.xbib.event.generic;
import org.junit.jupiter.api.Test;
import org.xbib.event.Event;
import org.xbib.event.EventConsumer;
import org.xbib.event.bus.Subscribe;
import org.xbib.event.common.EventImpl;
import org.xbib.event.common.EventManager;
import org.xbib.event.common.GenericEventImpl;
import org.xbib.settings.Settings;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Level;
import java.util.logging.Logger;
public class GenericEventManagerTest {
private static final Logger logger = Logger.getLogger(GenericEventManagerTest.class.getName());
@Test
void testGenericEvents() {
Settings settings = Settings.settingsBuilder()
.build();
TestEventConsumer consumer = new TestEventConsumer();
EventManager eventManager = EventManager.builder()
.setSettings(settings)
.register(consumer)
.build();
Event event = EventImpl.builder()
.setType("generic")
.setListener(e -> logger.log(Level.INFO, "received event " + e))
.build();
eventManager.getGenericEventManagerService().post(event);
}
@Test
void testGenericEventWithWaitForSubscriber() throws InterruptedException, ExecutionException {
Settings settings = Settings.settingsBuilder()
.build();
TestEventConsumer consumer = new TestEventConsumer();
EventManager eventManager = EventManager.builder()
.setSettings(settings)
.register(consumer)
.build();
CompletableFuture<Event> future = new CompletableFuture<>();
Event event = EventImpl.builder()
.setType("generic")
.setListener(e -> {
logger.log(Level.INFO, "received event " + e);
future.complete(e);
})
.build();
eventManager.getGenericEventManagerService().post(event);
Event e = future.get();
logger.log(Level.INFO, "the event was received with result " + e + ", continuing");
}
@Test
void testGenericEventWithWaitForAllConsumers() throws ExecutionException, InterruptedException, TimeoutException {
Settings settings = Settings.settingsBuilder()
.build();
TestEventConsumer consumer1 = new TestEventConsumer();
TestEventConsumer consumer2 = new TestEventConsumer();
EventManager eventManager = EventManager.builder()
.setSettings(settings)
.register(consumer1)
.register(consumer2)
.loadEventConsumers()
.build();
CompletableFuture<Event> future = new CompletableFuture<>();
Event event = GenericEventImpl.builder()
.setType("generic")
.setListener(e -> {
logger.log(Level.INFO, "received event " + e);
future.complete(e);
})
.build();
eventManager.getGenericEventManagerService().post((GenericEventImpl) event, future);
Event e = future.get(5L, TimeUnit.SECONDS);
if (e != null) {
logger.log(Level.INFO, "the event " + e + " was received by all consumers, continuing");
}
}
private static class TestEventConsumer implements EventConsumer {
TestEventConsumer() {
}
@Subscribe
public void onEvent(GenericEventImpl event) {
event.received();
}
}
}

View file

@ -21,11 +21,11 @@ public class FileFollowEventManagerTest {
TestFileFollowEventConsumer consumer = new TestFileFollowEventConsumer(); TestFileFollowEventConsumer consumer = new TestFileFollowEventConsumer();
Settings settings = Settings.settingsBuilder() Settings settings = Settings.settingsBuilder()
.put("event.filefollow.testfilefollowevent.enabled", "true") .put("event.filefollow.testfilefollowevent.enabled", "true")
.put("event.filefollow.testfilefollowevent.class", "org.xbib.event.io.file.TestFileFollowEvent")
.put("event.filefollow.testfilefollowevent.base", path.toString()) .put("event.filefollow.testfilefollowevent.base", path.toString())
.put("event.filefollow.testfilefollowevent.pattern", ".*") .put("event.filefollow.testfilefollowevent.pattern", ".*")
.build(); .build();
EventManager eventManager = EventManager.builder(settings) EventManager eventManager = EventManager.builder()
.setSettings(settings)
.register(consumer) .register(consumer)
.build(); .build();
Thread.sleep(5000L); Thread.sleep(5000L);

View file

@ -1,4 +1,10 @@
package org.xbib.event.path; package org.xbib.event.path;
import org.xbib.event.common.EventBuilder;
import org.xbib.event.common.FileFollowEventImpl;
public class TestFileFollowEvent extends FileFollowEventImpl { public class TestFileFollowEvent extends FileFollowEventImpl {
public TestFileFollowEvent(EventBuilder builder) {
super(builder);
}
} }

View file

@ -14,6 +14,6 @@ public class TestFileFollowEventConsumer implements EventConsumer {
@Subscribe @Subscribe
@AllowConcurrentEvents @AllowConcurrentEvents
void onEvent(TestFileFollowEvent event) { void onEvent(TestFileFollowEvent event) {
logger.log(Level.INFO, "received filefollow event path = " + event.getPath() + " content = " + event.getContent()); logger.log(Level.INFO, "received filefollow event path = " + event.getPath() + " content = " + event.getMessage());
} }
} }

View file

@ -1,4 +1,11 @@
package org.xbib.event.timer; package org.xbib.event.timer;
import org.xbib.event.common.EventBuilder;
import org.xbib.event.common.TimerEventImpl;
public class TestTimerEvent extends TimerEventImpl { public class TestTimerEvent extends TimerEventImpl {
public TestTimerEvent(EventBuilder builder) {
super(builder);
}
} }

View file

@ -4,7 +4,6 @@ import org.xbib.event.EventConsumer;
import org.xbib.event.bus.AllowConcurrentEvents; import org.xbib.event.bus.AllowConcurrentEvents;
import org.xbib.event.bus.Subscribe; import org.xbib.event.bus.Subscribe;
import java.io.IOException;
import java.time.Instant; import java.time.Instant;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -16,7 +15,7 @@ public class TestTimerEventConsumer implements EventConsumer {
@Subscribe @Subscribe
@AllowConcurrentEvents @AllowConcurrentEvents
void onEvent(TestTimerEvent event) { void onEvent(TestTimerEvent event) {
logger.log(Level.INFO, "received test timer event on " + Instant.now() + " event instant = " + event.getInstant()); logger.log(Level.INFO, "received test timer event on " + Instant.now() + " event instant = " + event.getCreated());
} }
} }

View file

@ -1,6 +1,7 @@
package org.xbib.event.timer; package org.xbib.event.timer;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import org.xbib.event.Payload;
import org.xbib.event.common.EventManager; import org.xbib.event.common.EventManager;
import org.xbib.settings.Settings; import org.xbib.settings.Settings;
@ -15,14 +16,15 @@ public class TimerEventManagerTest {
TestTimerEventConsumer consumer = new TestTimerEventConsumer(); TestTimerEventConsumer consumer = new TestTimerEventConsumer();
Settings settings = Settings.settingsBuilder() Settings settings = Settings.settingsBuilder()
.put("event.timer.testtimerevent.enabled", "true") .put("event.timer.testtimerevent.enabled", "true")
.put("event.timer.testtimerevent.class", "org.xbib.event.timer.TestTimerEvent")
.build(); .build();
EventManager eventManager = EventManager.builder(settings) EventManager eventManager = EventManager.builder()
.setSettings(settings)
.register(consumer) .register(consumer)
.build(); .build();
TimerEventManagerService timerEventManager = eventManager.getTimerEventManagerService(); TimerEventManagerService timerEventManager = eventManager.getTimerEventManagerService();
timerEventManager.put("testtimerevent", Instant.now().plusSeconds(5L), Map.of("a", "b")); Payload payload = new Payload(Map.of("a", "b"));
timerEventManager.put("testtimerevent", Instant.now().plusSeconds(5L), payload);
Thread.sleep(10000L); Thread.sleep(10000L);
timerEventManager.close(); timerEventManager.shutdown();
} }
} }

View file

@ -1,5 +1,6 @@
package org.xbib.event.net.http; package org.xbib.event.net.http;
import org.xbib.event.Event;
import org.xbib.event.common.EventManager; import org.xbib.event.common.EventManager;
import org.xbib.event.common.EventImpl; import org.xbib.event.common.EventImpl;
import org.xbib.net.http.HttpHeaderNames; import org.xbib.net.http.HttpHeaderNames;
@ -27,7 +28,7 @@ public class HttpEventReceiverService {
.setPath("/event/{type}") .setPath("/event/{type}")
.setMethod(HttpMethod.POST) .setMethod(HttpMethod.POST)
.setHandler(ctx -> { .setHandler(ctx -> {
EventImpl event = EventImpl.fromJson(ctx.getRequest().asJson()); Event event = EventImpl.fromJson(ctx.getRequest().asJson());
if (event.isNullEvent()) { if (event.isNullEvent()) {
ctx.status(NOT_FOUND).done(); ctx.status(NOT_FOUND).done();
} else { } else {

View file

@ -137,26 +137,6 @@ public class DefaultSyslogMessage implements SyslogMessage {
return sb.toString(); return sb.toString();
} }
@Override
public void setType(String key) {
// ignore
}
@Override
public String getType() {
return builder.messageId;
}
@Override
public void setMap(Map<String, Object> map) {
// ignore
}
@Override
public Map<String, Object> getMap() {
return builder.map;
}
public static class Builder implements SyslogMessage.Builder { public static class Builder implements SyslogMessage.Builder {
LocalDateTime date; LocalDateTime date;

View file

@ -1,7 +1,6 @@
package org.xbib.event.syslog; package org.xbib.event.syslog;
import org.xbib.datastructures.api.Builder; import org.xbib.datastructures.api.Builder;
import org.xbib.event.Event;
import java.net.InetAddress; import java.net.InetAddress;
import java.time.LocalDateTime; import java.time.LocalDateTime;
@ -10,7 +9,7 @@ import java.util.List;
/** /**
* Represents a standard syslog message. * Represents a standard syslog message.
*/ */
public interface Message extends Event { public interface Message {
/** /**
* Date of the message. This is the parsed date from the client. * Date of the message. This is the parsed date from the client.
* *

View file

@ -63,7 +63,7 @@ public class MessageEncoder extends MessageToMessageEncoder<Message> {
buffer.writeCharSequence(message.severity(), charset); buffer.writeCharSequence(message.severity(), charset);
buffer.writeBytes(pipe); buffer.writeBytes(pipe);
int index = 0; int index = 0;
for (Map.Entry<String, Object> kvp : message.getMap().entrySet()) { /*for (Map.Entry<String, Object> kvp : message.getMap().entrySet()) {
if (index > 0) { if (index > 0) {
buffer.writeBytes(EncoderHelper.SPACE); buffer.writeBytes(EncoderHelper.SPACE);
} }
@ -71,7 +71,7 @@ public class MessageEncoder extends MessageToMessageEncoder<Message> {
buffer.writeBytes(EncoderHelper.EQUALS); buffer.writeBytes(EncoderHelper.EQUALS);
buffer.writeCharSequence(kvp.getValue().toString(), charset); buffer.writeCharSequence(kvp.getValue().toString(), charset);
index++; index++;
} }*/
output.add(buffer); output.add(buffer);
} }