json format of event
This commit is contained in:
parent
788ce1b7d7
commit
757e62040b
18 changed files with 239 additions and 249 deletions
|
@ -4,7 +4,6 @@ module org.xbib.event.common {
|
|||
exports org.xbib.event.bus;
|
||||
exports org.xbib.event.clock;
|
||||
exports org.xbib.event.common;
|
||||
exports org.xbib.event.generic;
|
||||
exports org.xbib.event.log;
|
||||
exports org.xbib.event.path;
|
||||
exports org.xbib.event.persistence;
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package org.xbib.event.clock;
|
||||
|
||||
import org.xbib.event.bus.EventBus;
|
||||
import org.xbib.event.Event;
|
||||
import org.xbib.event.common.AbstractEventManagerService;
|
||||
import org.xbib.event.common.EventManager;
|
||||
import org.xbib.event.common.EventManagerService;
|
||||
import org.xbib.settings.Settings;
|
||||
|
@ -8,8 +9,6 @@ import org.xbib.time.schedule.CronExpression;
|
|||
import org.xbib.time.schedule.CronSchedule;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
|
@ -17,21 +16,21 @@ import java.util.concurrent.ThreadFactory;
|
|||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
public class ClockEventManagerService implements EventManagerService {
|
||||
public class ClockEventManagerService extends AbstractEventManagerService implements EventManagerService {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(ClockEventManagerService.class.getName());
|
||||
|
||||
private CronSchedule<Integer> cronSchedule;
|
||||
private EventManager eventManager;
|
||||
|
||||
private List<String> suspended;
|
||||
private CronSchedule<Boolean> cronSchedule;
|
||||
|
||||
public ClockEventManagerService() {
|
||||
super();
|
||||
}
|
||||
|
||||
public ClockEventManagerService init(EventManager eventManager) {
|
||||
this.eventManager = eventManager;
|
||||
Settings settings = eventManager.getSettings();
|
||||
EventBus eventBus = eventManager.getEventBus();
|
||||
this.suspended = new ArrayList<>();
|
||||
ThreadFactory threadFactory = new ThreadFactory() {
|
||||
int n = 1;
|
||||
@Override
|
||||
|
@ -49,7 +48,7 @@ public class ClockEventManagerService implements EventManagerService {
|
|||
String entry = entrySettings.get("entry");
|
||||
if (entry != null) {
|
||||
try {
|
||||
ClockEventService clockEventService = new ClockEventService(this, eventBus, name);
|
||||
ClockEventService clockEventService = new ClockEventService(this, name);
|
||||
cronSchedule.add(name, CronExpression.parse(entry), clockEventService);
|
||||
logger.log(Level.INFO, "cron job " + name + " scheduled on " + entry);
|
||||
} catch (Exception e) {
|
||||
|
@ -65,20 +64,12 @@ public class ClockEventManagerService implements EventManagerService {
|
|||
return this;
|
||||
}
|
||||
|
||||
public List<String> getSuspended() {
|
||||
return suspended;
|
||||
}
|
||||
|
||||
public void suspend(String name) {
|
||||
suspended.add(name);
|
||||
}
|
||||
|
||||
public void resume(String name) {
|
||||
suspended.remove(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() throws IOException {
|
||||
cronSchedule.close();
|
||||
}
|
||||
|
||||
public void publish(Event event) {
|
||||
eventManager.publish(event);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -5,43 +5,38 @@ import java.util.logging.Level;
|
|||
import java.util.logging.Logger;
|
||||
|
||||
import org.xbib.event.Event;
|
||||
import org.xbib.event.bus.EventBus;
|
||||
import org.xbib.event.common.EventManager;
|
||||
|
||||
public class ClockEventService implements Callable<Integer> {
|
||||
public class ClockEventService implements Callable<Boolean> {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(ClockEventService.class.getName());
|
||||
|
||||
private final ClockEventManagerService manager;
|
||||
|
||||
private final EventBus eventBus;
|
||||
private final ClockEventManagerService clockEventManagerService;
|
||||
|
||||
private final String name;
|
||||
|
||||
public ClockEventService(ClockEventManagerService manager,
|
||||
EventBus eventBus,
|
||||
public ClockEventService(ClockEventManagerService clockEventManagerService,
|
||||
String name) {
|
||||
this.manager = manager;
|
||||
this.eventBus = eventBus;
|
||||
this.clockEventManagerService = clockEventManagerService;
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Integer call() {
|
||||
public Boolean call() {
|
||||
try {
|
||||
if (manager.getSuspended().contains(name)) {
|
||||
if (clockEventManagerService.getSuspended().contains(name)) {
|
||||
logger.log(Level.FINE, "clock event " + name + " suspended");
|
||||
return 1;
|
||||
return false;
|
||||
} else {
|
||||
Event clockEvent = EventManager.eventBuilder()
|
||||
.setType("clock")
|
||||
.build();
|
||||
eventBus.post(clockEvent);
|
||||
return 0;
|
||||
clockEventManagerService.publish(clockEvent);
|
||||
return true;
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
logger.log(Level.WARNING, t.getMessage(), t);
|
||||
return 1;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
package org.xbib.event.common;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
|
||||
public abstract class AbstractEventManagerService {
|
||||
|
||||
private List<String> suspended;
|
||||
|
||||
public AbstractEventManagerService() {
|
||||
this.suspended = new CopyOnWriteArrayList<>();
|
||||
}
|
||||
|
||||
public void suspend(String name) {
|
||||
suspended.add(name);
|
||||
}
|
||||
|
||||
public void resume(String name) {
|
||||
suspended.remove(name);
|
||||
}
|
||||
|
||||
public List<String> getSuspended() {
|
||||
return suspended;
|
||||
}
|
||||
}
|
|
@ -11,16 +11,18 @@ import java.util.Map;
|
|||
import java.util.ServiceLoader;
|
||||
|
||||
import org.xbib.datastructures.json.tiny.Json;
|
||||
import org.xbib.datastructures.json.tiny.JsonBuilder;
|
||||
import org.xbib.event.Event;
|
||||
import org.xbib.event.EventConsumer;
|
||||
import org.xbib.event.Listener;
|
||||
import org.xbib.event.Payload;
|
||||
import org.xbib.event.bus.AsyncEventBus;
|
||||
import org.xbib.event.bus.EventBus;
|
||||
import org.xbib.event.bus.Subscriber;
|
||||
import org.xbib.event.bus.SubscriberExceptionContext;
|
||||
import org.xbib.event.bus.SubscriberExceptionHandler;
|
||||
import org.xbib.event.bus.SubscriberRegistry;
|
||||
import org.xbib.event.clock.ClockEventManagerService;
|
||||
import org.xbib.event.generic.GenericEventManagerService;
|
||||
import org.xbib.event.path.FileFollowEventManagerService;
|
||||
import org.xbib.event.path.PathEventManagerService;
|
||||
import org.xbib.event.timer.TimerEventManagerService;
|
||||
|
@ -31,12 +33,14 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
public final class EventManager {
|
||||
public final class EventManager extends AbstractEventManagerService implements EventManagerService {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(EventManager.class.getName());
|
||||
|
||||
|
@ -47,6 +51,7 @@ public final class EventManager {
|
|||
private final Map<Class<? extends EventManagerService>, EventManagerService> eventManagerServices;
|
||||
|
||||
private EventManager(EventManagerBuilder builder) {
|
||||
super();
|
||||
this.builder = builder;
|
||||
eventTypes.put("null", NullEvent.class);
|
||||
eventTypes.put("generic", GenericEventImpl.class);
|
||||
|
@ -59,7 +64,7 @@ public final class EventManager {
|
|||
}
|
||||
logger.log(Level.INFO, "installed events = " + eventTypes.keySet());
|
||||
this.eventManagerServices = new HashMap<>();
|
||||
eventManagerServices.put(GenericEventManagerService.class, new GenericEventManagerService().init(this));
|
||||
eventManagerServices.put(this.getClass(), this);
|
||||
eventManagerServices.put(ClockEventManagerService.class, new ClockEventManagerService().init(this));
|
||||
eventManagerServices.put(TimerEventManagerService.class, new TimerEventManagerService().init(this));
|
||||
eventManagerServices.put(FileFollowEventManagerService.class, new FileFollowEventManagerService().init(this));
|
||||
|
@ -70,6 +75,10 @@ public final class EventManager {
|
|||
logger.log(Level.INFO, "installed event service managers = " + eventManagerServices.keySet());
|
||||
}
|
||||
|
||||
public EventManagerService init(EventManager eventManager) {
|
||||
return this;
|
||||
}
|
||||
|
||||
public static EventManagerBuilder builder() {
|
||||
return new EventManagerBuilder();
|
||||
}
|
||||
|
@ -94,19 +103,11 @@ public final class EventManager {
|
|||
return builder.executorService;
|
||||
}
|
||||
|
||||
public void dispatch(Event event) {
|
||||
getGenericEventManagerService().post(event);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public <T extends EventManagerService> T getEventManagerService(Class<T> cl) {
|
||||
return (T) eventManagerServices.get(cl);
|
||||
}
|
||||
|
||||
public GenericEventManagerService getGenericEventManagerService() {
|
||||
return (GenericEventManagerService) eventManagerServices.get(GenericEventManagerService.class);
|
||||
}
|
||||
|
||||
public ClockEventManagerService getClockEventManagerService() {
|
||||
return (ClockEventManagerService) eventManagerServices.get(ClockEventManagerService.class);
|
||||
}
|
||||
|
@ -123,7 +124,19 @@ public final class EventManager {
|
|||
return (PathEventManagerService) eventManagerServices.get(PathEventManagerService.class);
|
||||
}
|
||||
|
||||
public void close() throws IOException {
|
||||
public void publish(Event event) {
|
||||
getEventBus().post(event);
|
||||
}
|
||||
|
||||
public void publish(GenericEventImpl event,
|
||||
CompletableFuture<Event> future) {
|
||||
SubscriberRegistry subscriberRegistry = getEventBus().getSubscribers();
|
||||
Set<Subscriber> set = subscriberRegistry.getSubscribersForTesting(event.getClass());
|
||||
event.setListener(new WrappedListener(event.getListener(), set.size(), future));
|
||||
publish(event);
|
||||
}
|
||||
|
||||
public void shutdown() throws IOException {
|
||||
for (EventConsumer eventConsumer : builder.eventConsumers) {
|
||||
if (eventConsumer instanceof Closeable closeable) {
|
||||
closeable.close();
|
||||
|
@ -131,7 +144,9 @@ public final class EventManager {
|
|||
}
|
||||
for (EventManagerService service : eventManagerServices.values()) {
|
||||
try {
|
||||
service.shutdown();
|
||||
if (service != this) {
|
||||
service.shutdown();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
logger.log(Level.SEVERE, e.getMessage(), e);
|
||||
}
|
||||
|
@ -244,6 +259,33 @@ public final class EventManager {
|
|||
}
|
||||
}
|
||||
|
||||
private static class WrappedListener implements Listener {
|
||||
|
||||
private final Listener listener;
|
||||
|
||||
private int size;
|
||||
|
||||
private final CompletableFuture<Event> future;
|
||||
|
||||
public WrappedListener(Listener listener, int size, CompletableFuture<Event> future) {
|
||||
this.listener = listener;
|
||||
this.size = size;
|
||||
this.future = future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void listen(Event event) {
|
||||
if (listener != null) {
|
||||
listener.listen(event);
|
||||
} else {
|
||||
logger.log(Level.WARNING, "listener not set");
|
||||
}
|
||||
if (--size == 0) {
|
||||
future.complete(event);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static Event eventOf(String eventType,
|
||||
String code,
|
||||
String message,
|
||||
|
@ -338,8 +380,22 @@ public final class EventManager {
|
|||
return builder.fileSize;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toJson() throws IOException {
|
||||
return Json.toString(builder.payload);
|
||||
JsonBuilder builder = JsonBuilder.builder();
|
||||
builder.beginMap();
|
||||
builder.fieldIfNotNull("type", getType());
|
||||
builder.fieldIfNotNull("code", getCode());
|
||||
builder.fieldIfNotNull("message", getMessage());
|
||||
builder.buildKey("payload").buildMap(getPayload());
|
||||
builder.fieldIfNotNull("created", getCreated() != null ? getCreated().toString() : null);
|
||||
builder.fieldIfNotNull("scheduled", getScheduledFor() != null ? getScheduledFor().toString() : null);
|
||||
builder.fieldIfNotNull("path", getPath() != null ? getPath().toAbsolutePath().toString() : null);
|
||||
builder.fieldIfNotNull("base", getBase());
|
||||
builder.fieldIfNotNull("suffix", getSuffix());
|
||||
builder.fieldIfNotNull("filesize", getFileSize());
|
||||
builder.endMap();
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -507,5 +563,4 @@ public final class EventManager {
|
|||
return pos >= 0 ? name.substring(pos + 1) : null;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,77 +0,0 @@
|
|||
package org.xbib.event.generic;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
import org.xbib.event.Event;
|
||||
import org.xbib.event.Listener;
|
||||
import org.xbib.event.bus.EventBus;
|
||||
import org.xbib.event.bus.Subscriber;
|
||||
import org.xbib.event.bus.SubscriberRegistry;
|
||||
import org.xbib.event.common.EventManager;
|
||||
import org.xbib.event.common.EventManagerService;
|
||||
import org.xbib.event.common.GenericEventImpl;
|
||||
|
||||
public class GenericEventManagerService implements EventManagerService {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(GenericEventManagerService.class.getName());
|
||||
|
||||
private EventBus eventBus;
|
||||
|
||||
public GenericEventManagerService() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public GenericEventManagerService init(EventManager eventManager) {
|
||||
this.eventBus = eventManager.getEventBus();
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() throws IOException {
|
||||
|
||||
}
|
||||
|
||||
public void post(Object event) {
|
||||
eventBus.post(event);
|
||||
}
|
||||
|
||||
public void post(GenericEventImpl event,
|
||||
CompletableFuture<Event> future) {
|
||||
SubscriberRegistry subscriberRegistry = eventBus.getSubscribers();
|
||||
Set<Subscriber> set = subscriberRegistry.getSubscribersForTesting(event.getClass());
|
||||
logger.log(Level.INFO, "set = " + set);
|
||||
event.setListener(new WrappedListener(event.getListener(), set.size(), future));
|
||||
post(event);
|
||||
}
|
||||
|
||||
static class WrappedListener implements Listener {
|
||||
|
||||
private final Listener listener;
|
||||
|
||||
private int size;
|
||||
|
||||
private final CompletableFuture<Event> future;
|
||||
|
||||
public WrappedListener(Listener listener, int size, CompletableFuture<Event> future) {
|
||||
this.listener = listener;
|
||||
this.size = size;
|
||||
this.future = future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void listen(Event event) {
|
||||
if (listener != null) {
|
||||
listener.listen(event);
|
||||
} else {
|
||||
logger.log(Level.WARNING, "listener not set");
|
||||
}
|
||||
if (--size == 0) {
|
||||
future.complete(event);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,6 +1,8 @@
|
|||
package org.xbib.event.path;
|
||||
|
||||
import org.xbib.event.Event;
|
||||
import org.xbib.event.bus.EventBus;
|
||||
import org.xbib.event.common.AbstractEventManagerService;
|
||||
import org.xbib.event.common.EventManager;
|
||||
import org.xbib.event.common.EventManagerService;
|
||||
import org.xbib.settings.Settings;
|
||||
|
@ -17,22 +19,26 @@ import java.util.logging.Level;
|
|||
import java.util.logging.Logger;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
public class FileFollowEventManagerService implements EventManagerService {
|
||||
public class FileFollowEventManagerService extends AbstractEventManagerService implements EventManagerService {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(FileFollowEventManagerService.class.getName());
|
||||
|
||||
private EventManager eventManager;
|
||||
|
||||
private Map<Future<?>, FileFollowEventService> eventServiceMap;
|
||||
|
||||
public FileFollowEventManagerService() {
|
||||
super();
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileFollowEventManagerService init(EventManager eventManager) {
|
||||
this.eventManager = eventManager;
|
||||
Settings settings = eventManager.getSettings();
|
||||
EventBus eventBus = eventManager.getEventBus();
|
||||
ExecutorService executorService = eventManager.getExecutorService();
|
||||
this.eventServiceMap = new LinkedHashMap<>();
|
||||
for (Map.Entry<String, Settings> entry : settings.getGroups("event.filefollow").entrySet()) {
|
||||
String name = entry.getKey();
|
||||
Settings definition = entry.getValue();
|
||||
if (definition.getAsBoolean("enabled", true)) {
|
||||
String baseStr = definition.get("base");
|
||||
|
@ -42,12 +48,12 @@ public class FileFollowEventManagerService implements EventManagerService {
|
|||
try {
|
||||
Path base = Paths.get(baseStr);
|
||||
Pattern pattern = Pattern.compile(patternStr);
|
||||
FileFollowEventService fileFollowEventService = new FileFollowEventService(definition, eventBus, base, pattern);
|
||||
FileFollowEventService fileFollowEventService = new FileFollowEventService(this, name, base, pattern);
|
||||
Future<?> future = executorService.submit(fileFollowEventService);
|
||||
eventServiceMap.put(future, fileFollowEventService);
|
||||
logger.log(Level.INFO, "file follow service " + entry.getKey() + " with base " + base + " and pattern " + pattern + " added");
|
||||
logger.log(Level.INFO, "file follow service " + name + " with base " + base + " and pattern " + pattern + " added");
|
||||
} catch (Exception e) {
|
||||
logger.log(Level.SEVERE, "unable to create file follow service " + entry.getKey() + ", reason " + e.getMessage(), e);
|
||||
logger.log(Level.SEVERE, "unable to create file follow service " +name + ", reason " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -65,4 +71,8 @@ public class FileFollowEventManagerService implements EventManagerService {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void publish(Event event) {
|
||||
eventManager.publish(event);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,9 +1,7 @@
|
|||
package org.xbib.event.path;
|
||||
|
||||
import org.xbib.event.Event;
|
||||
import org.xbib.event.bus.EventBus;
|
||||
import org.xbib.event.common.EventManager;
|
||||
import org.xbib.settings.Settings;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
|
@ -33,7 +31,9 @@ public class FileFollowEventService implements Callable<Integer>, Closeable {
|
|||
|
||||
private static final Logger logger = Logger.getLogger(FileFollowEventService.class.getName());
|
||||
|
||||
private final EventBus eventBus;
|
||||
private final FileFollowEventManagerService fileFollowEventManagerService;
|
||||
|
||||
private final String name;
|
||||
|
||||
private final Path base;
|
||||
|
||||
|
@ -45,11 +45,12 @@ public class FileFollowEventService implements Callable<Integer>, Closeable {
|
|||
|
||||
private volatile boolean keepWatching;
|
||||
|
||||
public FileFollowEventService(Settings settings,
|
||||
EventBus eventBus,
|
||||
public FileFollowEventService(FileFollowEventManagerService fileFollowEventManagerService,
|
||||
String name,
|
||||
Path base,
|
||||
Pattern pattern) throws IOException {
|
||||
this.eventBus = eventBus;
|
||||
this.fileFollowEventManagerService = fileFollowEventManagerService;
|
||||
this.name = name;
|
||||
this.base = base;
|
||||
this.pattern = pattern;
|
||||
FileSystem fileSystem = base.getFileSystem();
|
||||
|
@ -89,13 +90,17 @@ public class FileFollowEventService implements Callable<Integer>, Closeable {
|
|||
String content = readRange(channel, lastSize, currentSize);
|
||||
// split content by line, this allows pattern matching without preprocessing in worker
|
||||
for (String line : content.split("\n")) {
|
||||
Event event = EventManager.eventBuilder()
|
||||
.setType("filefollow")
|
||||
.setCode(base.toString())
|
||||
.setPath(path)
|
||||
.setMessage(line)
|
||||
.build();
|
||||
eventBus.post(event);
|
||||
if (fileFollowEventManagerService.getSuspended().contains(name)) {
|
||||
logger.log(Level.WARNING, name + " is suspended");
|
||||
} else {
|
||||
Event event = EventManager.eventBuilder()
|
||||
.setType("filefollow")
|
||||
.setCode(base.toString())
|
||||
.setPath(path)
|
||||
.setMessage(line)
|
||||
.build();
|
||||
fileFollowEventManagerService.publish(event);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,7 +3,7 @@ package org.xbib.event.path;
|
|||
import org.xbib.datastructures.api.TimeValue;
|
||||
import org.xbib.datastructures.json.tiny.Json;
|
||||
import org.xbib.event.Event;
|
||||
import org.xbib.event.bus.EventBus;
|
||||
import org.xbib.event.common.AbstractEventManagerService;
|
||||
import org.xbib.event.common.EventManager;
|
||||
import org.xbib.event.common.EventManagerService;
|
||||
import org.xbib.settings.Settings;
|
||||
|
@ -14,7 +14,6 @@ import java.io.Writer;
|
|||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.ArrayList;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -24,28 +23,26 @@ import java.util.logging.Level;
|
|||
import java.util.logging.Logger;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public class PathEventManagerService implements EventManagerService {
|
||||
public class PathEventManagerService extends AbstractEventManagerService implements EventManagerService {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(PathEventManagerService.class.getName());
|
||||
|
||||
private EventBus eventBus;
|
||||
private EventManager eventManager;
|
||||
|
||||
private ExecutorService executorService;
|
||||
|
||||
private Map<Future<?>, PathEventService> eventServiceMap;
|
||||
|
||||
private List<String> suspendedQueues;
|
||||
|
||||
public PathEventManagerService() {
|
||||
super();
|
||||
}
|
||||
|
||||
@Override
|
||||
public PathEventManagerService init(EventManager eventManager) {
|
||||
this.eventManager = eventManager;
|
||||
Settings settings = eventManager.getSettings();
|
||||
this.eventBus = eventManager.getEventBus();
|
||||
this.executorService = eventManager.getExecutorService();
|
||||
this.eventServiceMap = new LinkedHashMap<>();
|
||||
this.suspendedQueues = new ArrayList<>();
|
||||
for (Map.Entry<String, Settings> entry : settings.getGroups("event.path").entrySet()) {
|
||||
try {
|
||||
String name = entry.getKey();
|
||||
|
@ -54,9 +51,12 @@ public class PathEventManagerService implements EventManagerService {
|
|||
Path path = Paths.get(definition.get("path", "/var/tmp/" + name));
|
||||
TimeValue lifetime = definition.getAsTime("lifetime", TimeValue.timeValueHours(72));
|
||||
String eventType = definition.get("type", "path");
|
||||
createPathEventService(name, path, eventType, lifetime);
|
||||
createQueue(name, path);
|
||||
PathEventService pathEventService = new PathEventService(this, name, path, eventType, lifetime);
|
||||
add(pathEventService);
|
||||
pathEventService.drainIncoming();
|
||||
} else {
|
||||
logger.log(Level.WARNING, "path servive definition not enabled in configuration");
|
||||
logger.log(Level.WARNING, "path service definition not enabled in configuration");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.log(Level.SEVERE, "unable to create event path service " + entry.getKey() + ", reason " + e.getMessage(), e);
|
||||
|
@ -65,21 +65,29 @@ public class PathEventManagerService implements EventManagerService {
|
|||
return this;
|
||||
}
|
||||
|
||||
public void createPathEventService(String name, Path path, String eventType, TimeValue lifetime) throws IOException {
|
||||
createQueue(name, path);
|
||||
PathEventService pathEventService = new PathEventService(this, eventBus, name, path, eventType, lifetime);
|
||||
add(pathEventService);
|
||||
}
|
||||
|
||||
public void add(PathEventService pathEventService) {
|
||||
Future<?> future = executorService.submit(pathEventService);
|
||||
eventServiceMap.put(future, pathEventService);
|
||||
logger.log(Level.INFO, "path event service " + pathEventService + " added");
|
||||
}
|
||||
|
||||
public void publish(String eventType, Path path) {
|
||||
Event event = EventManager.eventOf(eventType, null, null, path);
|
||||
eventManager.publish(event);
|
||||
}
|
||||
|
||||
public void failEvent(String eventType, Path path) {
|
||||
try {
|
||||
Event event = EventManager.eventOf(eventType, null, null, path);
|
||||
event.fail();
|
||||
} catch (IOException e) {
|
||||
logger.log(Level.SEVERE, "unable to fail event because of " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() throws IOException {
|
||||
logger.log(Level.INFO, "shut down all path event services");
|
||||
logger.log(Level.INFO, "shutting down all path event services");
|
||||
eventServiceMap.forEach((k, v) -> {
|
||||
k.cancel(true);
|
||||
try {
|
||||
|
@ -103,30 +111,18 @@ public class PathEventManagerService implements EventManagerService {
|
|||
});
|
||||
}
|
||||
|
||||
public List<String> getSuspendedQueues() {
|
||||
return suspendedQueues;
|
||||
}
|
||||
|
||||
public void suspend(String queue) {
|
||||
suspendedQueues.add(queue);
|
||||
}
|
||||
|
||||
public void resume(String queue) {
|
||||
suspendedQueues.remove(queue);
|
||||
}
|
||||
|
||||
public boolean put(Path path, String key, Map<String,Object> map) throws IOException {
|
||||
return put(path, key, ".json", Json.toString(map));
|
||||
}
|
||||
|
||||
public boolean putIfNotExists(Path path, String key, Map<String,Object> map) throws IOException {
|
||||
if (!exists(path, key, ".json")) {
|
||||
return put(path, key, ".json", Json.toString(map));
|
||||
return put(path, key, map);
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public boolean put(Path path, String key, Map<String,Object> map) throws IOException {
|
||||
return put(path, key, ".json", Json.toString(map));
|
||||
}
|
||||
|
||||
public boolean put(Path path, String key, String suffix, String string) throws IOException {
|
||||
String keyFileName = key + suffix;
|
||||
if (Files.exists(path.resolve(Event.INCOMING).resolve(keyFileName)) ||
|
||||
|
|
|
@ -4,8 +4,6 @@ import java.nio.file.FileVisitResult;
|
|||
import java.nio.file.SimpleFileVisitor;
|
||||
import org.xbib.datastructures.api.TimeValue;
|
||||
import org.xbib.event.Event;
|
||||
import org.xbib.event.bus.EventBus;
|
||||
import org.xbib.event.common.EventManager;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
|
@ -25,14 +23,12 @@ import java.util.concurrent.Callable;
|
|||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
public class PathEventService implements Callable<Integer>, Closeable {
|
||||
public class PathEventService implements Callable<Boolean>, Closeable {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(PathEventService.class.getName());
|
||||
|
||||
private final PathEventManagerService pathEventManager;
|
||||
|
||||
private final EventBus eventBus;
|
||||
|
||||
private final Path path;
|
||||
|
||||
private final String name;
|
||||
|
@ -43,18 +39,14 @@ public class PathEventService implements Callable<Integer>, Closeable {
|
|||
|
||||
private final WatchService watchService;
|
||||
|
||||
private int eventCount;
|
||||
|
||||
private volatile boolean keepWatching;
|
||||
|
||||
public PathEventService(PathEventManagerService pathEventManager,
|
||||
EventBus eventBus,
|
||||
String name,
|
||||
Path path,
|
||||
String eventType,
|
||||
TimeValue lifetime) throws IOException {
|
||||
this.pathEventManager = pathEventManager;
|
||||
this.eventBus = eventBus;
|
||||
this.name = name;
|
||||
this.path = path;
|
||||
this.eventType = eventType;
|
||||
|
@ -69,7 +61,7 @@ public class PathEventService implements Callable<Integer>, Closeable {
|
|||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public Integer call() {
|
||||
public Boolean call() {
|
||||
try {
|
||||
logger.log(Level.INFO, "watch service running on " + path.resolve(Event.INCOMING));
|
||||
while (keepWatching && watchService != null) {
|
||||
|
@ -86,10 +78,10 @@ public class PathEventService implements Callable<Integer>, Closeable {
|
|||
String watchEventContext = pathWatchEvent.context().toString();
|
||||
Path p = path.resolve(Event.INCOMING).resolve(watchEventContext);
|
||||
logger.log(Level.FINE, "watch event " + pathWatchEvent + " context = " + watchEventContext + " path = " + p);
|
||||
if (pathEventManager.getSuspendedQueues().contains(name)) {
|
||||
failEvent(p);
|
||||
if (pathEventManager.getSuspended().contains(name)) {
|
||||
pathEventManager.failEvent(eventType, p);
|
||||
} else {
|
||||
postEvent(p);
|
||||
pathEventManager.publish(eventType, p);
|
||||
}
|
||||
}
|
||||
watchKey.reset();
|
||||
|
@ -102,7 +94,7 @@ public class PathEventService implements Callable<Integer>, Closeable {
|
|||
} catch (Exception e) {
|
||||
logger.log(Level.SEVERE, e.getMessage(), e);
|
||||
}
|
||||
return eventCount;
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -120,7 +112,7 @@ public class PathEventService implements Callable<Integer>, Closeable {
|
|||
|
||||
public void drainIncoming() throws IOException {
|
||||
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path.resolve(Event.INCOMING))) {
|
||||
stream.forEach(this::postEvent);
|
||||
stream.forEach(p -> pathEventManager.publish(eventType, p));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -149,21 +141,6 @@ public class PathEventService implements Callable<Integer>, Closeable {
|
|||
}
|
||||
}
|
||||
|
||||
private void postEvent(Path path) {
|
||||
Event event = EventManager.eventOf(eventType, null, null, path);
|
||||
eventBus.post(event);
|
||||
eventCount++;
|
||||
}
|
||||
|
||||
private void failEvent(Path file) {
|
||||
try {
|
||||
Event event = EventManager.eventFromFile(file);
|
||||
event.fail();
|
||||
} catch (IOException e) {
|
||||
logger.log(Level.SEVERE, "unable to fail event because of " + e.getMessage(), e);
|
||||
}
|
||||
}
|
||||
|
||||
private static void delete(Path path) {
|
||||
if (path == null) {
|
||||
return;
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
package org.xbib.event.timer;
|
||||
|
||||
import org.xbib.event.Event;
|
||||
import org.xbib.event.Payload;
|
||||
import org.xbib.event.bus.EventBus;
|
||||
import org.xbib.event.common.AbstractEventManagerService;
|
||||
import org.xbib.event.common.EventManager;
|
||||
import org.xbib.event.common.EventManagerService;
|
||||
import org.xbib.event.persistence.FilePersistenceStore;
|
||||
|
@ -20,26 +21,29 @@ import java.util.Map;
|
|||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
||||
public class TimerEventManagerService implements EventManagerService {
|
||||
public class TimerEventManagerService extends AbstractEventManagerService implements EventManagerService {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(TimerEventManagerService.class.getName());
|
||||
|
||||
private EventManager eventManager;
|
||||
|
||||
private Map<String, TimerEventService> services;
|
||||
|
||||
public TimerEventManagerService() {
|
||||
super();
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimerEventManagerService init(EventManager eventManager) {
|
||||
this.eventManager = eventManager;
|
||||
Settings settings = eventManager.getSettings();
|
||||
EventBus eventBus = eventManager.getEventBus();
|
||||
this.services = new LinkedHashMap<>();
|
||||
for (Map.Entry<String, Settings> entry : settings.getGroups("event.timer").entrySet()) {
|
||||
String name = entry.getKey();
|
||||
Settings timerSettings = entry.getValue();
|
||||
try {
|
||||
PersistenceStore<String, Object> persistenceStore = new FilePersistenceStore(timerSettings, name);
|
||||
TimerEventService timerEventService = new TimerEventService(eventBus, name, ZoneId.systemDefault(), persistenceStore);
|
||||
TimerEventService timerEventService = new TimerEventService(this, name, ZoneId.systemDefault(), persistenceStore);
|
||||
services.put(name, timerEventService);
|
||||
logger.log(Level.INFO, "timer " + name + " active: " + timerEventService);
|
||||
} catch (Exception e) {
|
||||
|
@ -49,9 +53,21 @@ public class TimerEventManagerService implements EventManagerService {
|
|||
return this;
|
||||
}
|
||||
|
||||
public boolean publish(String name,
|
||||
String timeSpec,
|
||||
Payload payload) throws ParseException, IOException {
|
||||
@Override
|
||||
public void shutdown() throws IOException {
|
||||
for (Map.Entry<String, TimerEventService> entry : services.entrySet()) {
|
||||
logger.log(Level.INFO, "closing timer " + entry.getKey());
|
||||
entry.getValue().close();
|
||||
}
|
||||
}
|
||||
|
||||
public void publish(Event event) {
|
||||
eventManager.publish(event);
|
||||
}
|
||||
|
||||
public boolean schedule(String name,
|
||||
String timeSpec,
|
||||
Payload payload) throws ParseException, IOException {
|
||||
if (services.containsKey(name)) {
|
||||
Span span = Chronic.parse(timeSpec);
|
||||
if (span != null) {
|
||||
|
@ -69,9 +85,9 @@ public class TimerEventManagerService implements EventManagerService {
|
|||
return false;
|
||||
}
|
||||
|
||||
public boolean publish(String service,
|
||||
Instant instant,
|
||||
Payload payload) throws IOException {
|
||||
public boolean schedule(String service,
|
||||
Instant instant,
|
||||
Payload payload) throws IOException {
|
||||
if (services.containsKey(service)) {
|
||||
services.get(service).schedule(instant, payload);
|
||||
return true;
|
||||
|
@ -87,12 +103,4 @@ public class TimerEventManagerService implements EventManagerService {
|
|||
entry.getValue().purge();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() throws IOException {
|
||||
for (Map.Entry<String, TimerEventService> entry : services.entrySet()) {
|
||||
logger.log(Level.INFO, "closing timer " + entry.getKey());
|
||||
entry.getValue().close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,7 +2,6 @@ package org.xbib.event.timer;
|
|||
|
||||
import org.xbib.event.Event;
|
||||
import org.xbib.event.Payload;
|
||||
import org.xbib.event.bus.EventBus;
|
||||
import org.xbib.event.common.EventManager;
|
||||
import org.xbib.event.persistence.PersistenceStore;
|
||||
|
||||
|
@ -25,8 +24,10 @@ import java.util.logging.Logger;
|
|||
class TimerEventService implements Closeable {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(TimerEventService.class.getName());
|
||||
|
||||
private final EventBus eventBus;
|
||||
|
||||
private final TimerEventManagerService timerEventManagerService;
|
||||
|
||||
private final String name;
|
||||
|
||||
private final ZoneId zoneId;
|
||||
|
||||
|
@ -34,11 +35,12 @@ class TimerEventService implements Closeable {
|
|||
|
||||
private final Timer timer;
|
||||
|
||||
public TimerEventService(EventBus eventBus,
|
||||
public TimerEventService(TimerEventManagerService timerEventManagerService,
|
||||
String name,
|
||||
ZoneId zoneId,
|
||||
PersistenceStore<String, Object> persistenceStore) throws IOException {
|
||||
this.eventBus = eventBus;
|
||||
this.timerEventManagerService = timerEventManagerService;
|
||||
this.name = name;
|
||||
this.zoneId = zoneId;
|
||||
this.persistenceStore = persistenceStore;
|
||||
this.timer = new Timer();
|
||||
|
@ -97,12 +99,16 @@ class TimerEventService implements Closeable {
|
|||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
if (timerEventManagerService.getSuspended().contains(name)) {
|
||||
logger.log(Level.FINE, "timer event " + name + " suspended");
|
||||
return;
|
||||
}
|
||||
Event timerEvent = EventManager.eventBuilder()
|
||||
.setType("timer")
|
||||
.setCode(name)
|
||||
.setPayload(payload)
|
||||
.build();
|
||||
logger.log(Level.FINE, "posting timer event " + timerEvent.getClass().getName() + " with payload = " + payload);
|
||||
eventBus.post(timerEvent);
|
||||
timerEventManagerService.publish(timerEvent);
|
||||
logger.log(Level.FINE, "persistence before remove: " + persistenceStore.get("tasks"));
|
||||
if (persistenceStore.remove("tasks", this.payload)) {
|
||||
logger.log(Level.FINE, "removal done");
|
||||
|
|
|
@ -20,6 +20,6 @@ public class ClockEventManagerTest {
|
|||
.register(clockEventConsumer)
|
||||
.build();
|
||||
Thread.sleep(90000L);
|
||||
eventManager.close();
|
||||
eventManager.shutdown();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,11 +1,9 @@
|
|||
package org.xbib.event.generic;
|
||||
package org.xbib.event.common;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.xbib.event.Event;
|
||||
import org.xbib.event.EventConsumer;
|
||||
import org.xbib.event.bus.Subscribe;
|
||||
import org.xbib.event.common.EventManager;
|
||||
import org.xbib.event.common.GenericEventImpl;
|
||||
import org.xbib.settings.Settings;
|
||||
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
@ -32,7 +30,7 @@ public class GenericEventManagerTest {
|
|||
.setType("generic")
|
||||
.setListener(e -> logger.log(Level.INFO, "received event " + e))
|
||||
.build();
|
||||
eventManager.getGenericEventManagerService().post(event);
|
||||
eventManager.publish(event);
|
||||
// we must wait for a certain time because we do not use a future
|
||||
Thread.sleep(500L);
|
||||
assertEquals(1, consumer.getCount());
|
||||
|
@ -52,7 +50,7 @@ public class GenericEventManagerTest {
|
|||
future.complete(e);
|
||||
})
|
||||
.build();
|
||||
eventManager.getGenericEventManagerService().post(event);
|
||||
eventManager.publish(event);
|
||||
Event e = future.get(1000L, TimeUnit.MILLISECONDS);
|
||||
assertNotNull(e);
|
||||
}
|
||||
|
@ -76,7 +74,7 @@ public class GenericEventManagerTest {
|
|||
future.complete(e);
|
||||
})
|
||||
.build();
|
||||
eventManager.getGenericEventManagerService().post((GenericEventImpl) event, future);
|
||||
eventManager.publish((GenericEventImpl) event, future);
|
||||
Event e = future.get(5L, TimeUnit.SECONDS);
|
||||
if (e != null) {
|
||||
logger.log(Level.INFO, "the event " + e + " was received by all consumers");
|
|
@ -37,6 +37,6 @@ public class FileFollowEventManagerTest {
|
|||
Thread.sleep(1000L);
|
||||
Files.delete(testTxt);
|
||||
Files.delete(path);
|
||||
eventManager.close();
|
||||
eventManager.shutdown();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,7 +34,8 @@ public class PathEventManagerTest {
|
|||
bufferedWriter.write("Hello");
|
||||
}
|
||||
Thread.sleep(2000L);
|
||||
eventManager.close();
|
||||
eventManager.shutdown();
|
||||
// extra destroy to clean up test
|
||||
eventManager.getPathEventManagerService().destroy();
|
||||
}
|
||||
|
||||
|
@ -57,7 +58,8 @@ public class PathEventManagerTest {
|
|||
bufferedWriter.write("Hello");
|
||||
}
|
||||
Thread.sleep(2000L);
|
||||
eventManager.close();
|
||||
eventManager.shutdown();
|
||||
// extra destroy to clean up test
|
||||
eventManager.getPathEventManagerService().destroy();
|
||||
}
|
||||
|
||||
|
|
|
@ -23,7 +23,7 @@ public class TimerEventManagerTest {
|
|||
.build();
|
||||
TimerEventManagerService timerEventManager = eventManager.getTimerEventManagerService();
|
||||
Payload payload = new Payload(Map.of("a", "b"));
|
||||
timerEventManager.publish("testtimerevent", Instant.now().plusSeconds(5L), payload);
|
||||
timerEventManager.schedule("testtimerevent", Instant.now().plusSeconds(5L), payload);
|
||||
Thread.sleep(10000L);
|
||||
timerEventManager.shutdown();
|
||||
}
|
||||
|
|
|
@ -24,14 +24,14 @@ public class HttpEventReceiverService {
|
|||
public HttpService createService(String prefix) {
|
||||
return BaseHttpService.builder()
|
||||
.setPrefix(prefix)
|
||||
.setPath("/event/{type}")
|
||||
.setPath("/event")
|
||||
.setMethod(HttpMethod.POST)
|
||||
.setHandler(ctx -> {
|
||||
Event event = EventManager.eventFromJson(ctx.getRequest().asJson());
|
||||
if (event.isNullEvent()) {
|
||||
ctx.status(NOT_FOUND).done();
|
||||
} else {
|
||||
eventManager.dispatch(event);
|
||||
eventManager.publish(event);
|
||||
ctx.status(OK)
|
||||
.header(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON)
|
||||
.charset(StandardCharsets.UTF_8)
|
||||
|
|
Loading…
Reference in a new issue