add path event manager
This commit is contained in:
parent
7805e987ed
commit
3792b31fc7
4 changed files with 208 additions and 4 deletions
|
@ -5,9 +5,11 @@ import org.xbib.event.bus.SubscriberExceptionContext;
|
||||||
import org.xbib.event.bus.SubscriberExceptionHandler;
|
import org.xbib.event.bus.SubscriberExceptionHandler;
|
||||||
import org.xbib.event.clock.ClockEventManager;
|
import org.xbib.event.clock.ClockEventManager;
|
||||||
import org.xbib.event.io.file.FileFollowEventManager;
|
import org.xbib.event.io.file.FileFollowEventManager;
|
||||||
|
import org.xbib.event.io.path.PathEventManager;
|
||||||
import org.xbib.event.timer.TimerEventManager;
|
import org.xbib.event.timer.TimerEventManager;
|
||||||
import org.xbib.settings.Settings;
|
import org.xbib.settings.Settings;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.time.ZoneId;
|
import java.time.ZoneId;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -18,7 +20,7 @@ import java.util.concurrent.Executors;
|
||||||
import java.util.logging.Level;
|
import java.util.logging.Level;
|
||||||
import java.util.logging.Logger;
|
import java.util.logging.Logger;
|
||||||
|
|
||||||
public final class EventManager {
|
public final class EventManager implements Closeable {
|
||||||
|
|
||||||
private static final Logger logger = Logger.getLogger(EventManager.class.getName());
|
private static final Logger logger = Logger.getLogger(EventManager.class.getName());
|
||||||
|
|
||||||
|
@ -40,10 +42,13 @@ public final class EventManager {
|
||||||
|
|
||||||
private final FileFollowEventManager fileFollowEventManager;
|
private final FileFollowEventManager fileFollowEventManager;
|
||||||
|
|
||||||
|
private final PathEventManager pathEventManager;
|
||||||
|
|
||||||
private EventManager(Settings settings) {
|
private EventManager(Settings settings) {
|
||||||
this.clockEventManager = new ClockEventManager(settings, eventBus, classLoader);
|
this.clockEventManager = new ClockEventManager(settings, eventBus, classLoader);
|
||||||
this.timerEventManager = new TimerEventManager(settings, eventBus, classLoader, ZoneId.systemDefault());
|
this.timerEventManager = new TimerEventManager(settings, eventBus, classLoader, ZoneId.systemDefault());
|
||||||
this.fileFollowEventManager = new FileFollowEventManager(settings, eventBus, executorService, classLoader);
|
this.fileFollowEventManager = new FileFollowEventManager(settings, eventBus, executorService, classLoader);
|
||||||
|
this.pathEventManager = new PathEventManager(settings, eventBus, executorService, classLoader);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static EventManager newEventManager(Settings settings) {
|
public static EventManager newEventManager(Settings settings) {
|
||||||
|
@ -72,8 +77,19 @@ public final class EventManager {
|
||||||
return fileFollowEventManager;
|
return fileFollowEventManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public PathEventManager getPathEventManager() {
|
||||||
|
return pathEventManager;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object obj) {
|
||||||
|
return super.equals(obj);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
clockEventManager.close();
|
clockEventManager.close();
|
||||||
|
pathEventManager.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class EventManagerExceptionHandler implements SubscriberExceptionHandler {
|
private static class EventManagerExceptionHandler implements SubscriberExceptionHandler {
|
||||||
|
|
|
@ -32,7 +32,7 @@ public class FileFollowEventManager {
|
||||||
if (baseStr != null && patternStr != null) {
|
if (baseStr != null && patternStr != null) {
|
||||||
Path base = Paths.get(baseStr);
|
Path base = Paths.get(baseStr);
|
||||||
Pattern pattern = Pattern.compile(patternStr);
|
Pattern pattern = Pattern.compile(patternStr);
|
||||||
String className = definition.get("class", FileFollowEvent.class.getName());
|
String className = definition.get("class", DefaultFileFollowEvent.class.getName());
|
||||||
try {
|
try {
|
||||||
Class<? extends FileFollowEvent> eventClass = (Class<? extends FileFollowEvent>) classLoader.loadClass(className);
|
Class<? extends FileFollowEvent> eventClass = (Class<? extends FileFollowEvent>) classLoader.loadClass(className);
|
||||||
FileFollowEventService fileFollowEventService = new FileFollowEventService(definition, eventBus, base, pattern, eventClass);
|
FileFollowEventService fileFollowEventService = new FileFollowEventService(definition, eventBus, base, pattern, eventClass);
|
||||||
|
|
146
src/main/java/org/xbib/event/io/path/PathEventManager.java
Normal file
146
src/main/java/org/xbib/event/io/path/PathEventManager.java
Normal file
|
@ -0,0 +1,146 @@
|
||||||
|
package org.xbib.event.io.path;
|
||||||
|
|
||||||
|
import org.xbib.datastructures.api.TimeValue;
|
||||||
|
import org.xbib.datastructures.json.tiny.Json;
|
||||||
|
import org.xbib.event.bus.AsyncEventBus;
|
||||||
|
import org.xbib.settings.Settings;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.UncheckedIOException;
|
||||||
|
import java.io.Writer;
|
||||||
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
import java.nio.file.Paths;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.logging.Level;
|
||||||
|
import java.util.logging.Logger;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
public class PathEventManager implements Closeable {
|
||||||
|
|
||||||
|
private static final Logger logger = Logger.getLogger(PathEventManager.class.getName());
|
||||||
|
|
||||||
|
private final Path path;
|
||||||
|
|
||||||
|
private final Map<Future<?>, PathEventService> eventServiceMap;
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public PathEventManager(Settings settings,
|
||||||
|
AsyncEventBus eventBus,
|
||||||
|
ExecutorService executorService,
|
||||||
|
ClassLoader classLoader) {
|
||||||
|
this.eventServiceMap = new LinkedHashMap<>();
|
||||||
|
this.path = Paths.get(settings.get("event.pathevent.base", "/var/tmp/pathevent"));
|
||||||
|
for (Map.Entry<String, Settings> entry : settings.getGroups("event.path").entrySet()) {
|
||||||
|
try {
|
||||||
|
String name = entry.getKey();
|
||||||
|
Settings definition = entry.getValue();
|
||||||
|
int maxBytes = definition.getAsInt("maxfilesize", 10 * 1024 * 1024); // 10 MB
|
||||||
|
TimeValue lifetime = definition.getAsTime("lifetime", TimeValue.timeValueHours(72));
|
||||||
|
String className = definition.get("class", DefaultPathEvent.class.getName());
|
||||||
|
Class<? extends PathEvent> eventClass = (Class<? extends PathEvent>) classLoader.loadClass(className);
|
||||||
|
Path p = path.resolve(name);
|
||||||
|
createQueue(name, p);
|
||||||
|
PathEventService pathEventService = new PathEventService(eventBus, name, p, maxBytes, lifetime, eventClass);
|
||||||
|
Future<?> future = executorService.submit(pathEventService);
|
||||||
|
eventServiceMap.put(future, pathEventService);
|
||||||
|
logger.log(Level.INFO, "path event service " + entry.getKey() + " with path " + p + " and max " + maxBytes + " added, event class " + className);
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.log(Level.SEVERE, "unable to create event path service " + entry.getKey() + ", reason " + e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void createQueue(String name, Path p) throws IOException {
|
||||||
|
if (!Files.exists(p)) {
|
||||||
|
logger.log(Level.FINE, "creating queue " + name + " at " + p);
|
||||||
|
Files.createDirectories(p);
|
||||||
|
for (String s: List.of(PathEventService.INCOMING, PathEventService.SUCCESS, PathEventService.FAIL)) {
|
||||||
|
Path dir = p.resolve(s);
|
||||||
|
if (!Files.exists(dir)) {
|
||||||
|
logger.log(Level.FINE, "creating queue " + name + " dir " + dir);
|
||||||
|
Files.createDirectories(dir);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close() throws IOException {
|
||||||
|
logger.log(Level.INFO, "closing all path event services");
|
||||||
|
eventServiceMap.forEach((k, v) -> {
|
||||||
|
k.cancel(true);
|
||||||
|
try {
|
||||||
|
v.close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
logger.log(Level.SEVERE, e.getMessage(), e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean put(String queue, String key, Map<String,Object> map) throws IOException {
|
||||||
|
return put(queue, key, ".json", Json.toString(map));
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean putIfNotExists(String queue, String key, Map<String,Object> map) throws IOException {
|
||||||
|
if (!exists(queue, key, ".json")) {
|
||||||
|
return put(queue, key, ".json", Json.toString(map));
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean put(String queue, String key, String suffix, String string) throws IOException {
|
||||||
|
String keyFileName = key + suffix;
|
||||||
|
Path queuePath = path.resolve(queue);
|
||||||
|
if (Files.exists(queuePath.resolve(PathEventService.INCOMING).resolve(keyFileName)) ||
|
||||||
|
Files.exists(queuePath.resolve(PathEventService.SUCCESS).resolve(keyFileName))) {
|
||||||
|
logger.log(Level.WARNING, "key " + key + " already exists");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
Path p = queuePath.resolve(PathEventService.INCOMING).resolve(keyFileName);
|
||||||
|
try (Writer writer = Files.newBufferedWriter(p)) {
|
||||||
|
writer.write(string);
|
||||||
|
}
|
||||||
|
eventServiceMap.forEach((k, v) -> {
|
||||||
|
if (v.getName().equals(queue)) {
|
||||||
|
try {
|
||||||
|
v.purge();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new UncheckedIOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean exists(String queue, String key, String suffix) {
|
||||||
|
String keyFileName = key + suffix;
|
||||||
|
Path queuePath = path.resolve(queue);
|
||||||
|
return Files.exists(queuePath.resolve(PathEventService.INCOMING).resolve(keyFileName)) ||
|
||||||
|
Files.exists(queuePath.resolve(PathEventService.SUCCESS).resolve(keyFileName));
|
||||||
|
}
|
||||||
|
|
||||||
|
public long sizeOfIncoming(String queue) throws IOException {
|
||||||
|
return sizeOf(path.resolve(queue).resolve(PathEventService.INCOMING));
|
||||||
|
}
|
||||||
|
|
||||||
|
public long sizeOfSuccess(String queue) throws IOException {
|
||||||
|
return sizeOf(path.resolve(queue).resolve(PathEventService.SUCCESS));
|
||||||
|
}
|
||||||
|
|
||||||
|
public long sizeOfFail(String queue) throws IOException {
|
||||||
|
return sizeOf(path.resolve(queue).resolve(PathEventService.FAIL));
|
||||||
|
}
|
||||||
|
|
||||||
|
public static long sizeOf(Path path) throws IOException {
|
||||||
|
try (Stream<Path> stream = Files.find(path, 1, (p, basicFileAttributes) -> Files.isRegularFile(p))) {
|
||||||
|
return stream.count();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,10 +1,12 @@
|
||||||
package org.xbib.event.io.path;
|
package org.xbib.event.io.path;
|
||||||
|
|
||||||
|
import org.xbib.datastructures.api.TimeValue;
|
||||||
import org.xbib.datastructures.json.tiny.Json;
|
import org.xbib.datastructures.json.tiny.Json;
|
||||||
import org.xbib.event.bus.EventBus;
|
import org.xbib.event.bus.EventBus;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.UncheckedIOException;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.nio.file.ClosedWatchServiceException;
|
import java.nio.file.ClosedWatchServiceException;
|
||||||
import java.nio.file.DirectoryStream;
|
import java.nio.file.DirectoryStream;
|
||||||
|
@ -14,7 +16,9 @@ import java.nio.file.StandardWatchEventKinds;
|
||||||
import java.nio.file.WatchEvent;
|
import java.nio.file.WatchEvent;
|
||||||
import java.nio.file.WatchKey;
|
import java.nio.file.WatchKey;
|
||||||
import java.nio.file.WatchService;
|
import java.nio.file.WatchService;
|
||||||
import java.util.LinkedHashMap;
|
import java.nio.file.attribute.BasicFileAttributeView;
|
||||||
|
import java.nio.file.attribute.BasicFileAttributes;
|
||||||
|
import java.time.Instant;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.logging.Level;
|
import java.util.logging.Level;
|
||||||
import java.util.logging.Logger;
|
import java.util.logging.Logger;
|
||||||
|
@ -33,7 +37,11 @@ public class PathEventService implements Callable<Integer>, Closeable {
|
||||||
|
|
||||||
private final Path path;
|
private final Path path;
|
||||||
|
|
||||||
|
private final String name;
|
||||||
|
|
||||||
private final int maxFileSize;
|
private final int maxFileSize;
|
||||||
|
|
||||||
|
private final TimeValue lifetime;
|
||||||
;
|
;
|
||||||
private final Class<? extends PathEvent> pathEventClass;
|
private final Class<? extends PathEvent> pathEventClass;
|
||||||
|
|
||||||
|
@ -44,12 +52,16 @@ public class PathEventService implements Callable<Integer>, Closeable {
|
||||||
private volatile boolean keepWatching;
|
private volatile boolean keepWatching;
|
||||||
|
|
||||||
public PathEventService(EventBus eventBus,
|
public PathEventService(EventBus eventBus,
|
||||||
|
String name,
|
||||||
Path path,
|
Path path,
|
||||||
int maxFileSize,
|
int maxFileSize,
|
||||||
|
TimeValue lifetime,
|
||||||
Class<? extends PathEvent> pathEventClass) throws IOException {
|
Class<? extends PathEvent> pathEventClass) throws IOException {
|
||||||
this.eventBus = eventBus;
|
this.eventBus = eventBus;
|
||||||
|
this.name = name;
|
||||||
this.path = path;
|
this.path = path;
|
||||||
this.maxFileSize = maxFileSize;
|
this.maxFileSize = maxFileSize;
|
||||||
|
this.lifetime = lifetime;
|
||||||
this.pathEventClass = pathEventClass;
|
this.pathEventClass = pathEventClass;
|
||||||
drainIncoming();
|
drainIncoming();
|
||||||
this.watchService = path.getFileSystem().newWatchService();
|
this.watchService = path.getFileSystem().newWatchService();
|
||||||
|
@ -59,6 +71,11 @@ public class PathEventService implements Callable<Integer>, Closeable {
|
||||||
logger.log(Level.INFO, "path event service created for incoming files at " + path + " max file size = " + maxFileSize);
|
logger.log(Level.INFO, "path event service created for incoming files at " + path + " max file size = " + maxFileSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getName() {
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
public Integer call() {
|
public Integer call() {
|
||||||
|
@ -102,7 +119,7 @@ public class PathEventService implements Callable<Integer>, Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void drainIncoming() throws IOException {
|
public void drainIncoming() throws IOException {
|
||||||
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path.resolve(INCOMING))) {
|
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path.resolve(INCOMING))) {
|
||||||
stream.forEach(path -> {
|
stream.forEach(path -> {
|
||||||
if (Files.isRegularFile(path)) {
|
if (Files.isRegularFile(path)) {
|
||||||
|
@ -114,6 +131,31 @@ public class PathEventService implements Callable<Integer>, Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void purge() throws IOException {
|
||||||
|
purge(path);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void purge(Path path) throws IOException {
|
||||||
|
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path)) {
|
||||||
|
stream.forEach(p -> {
|
||||||
|
try {
|
||||||
|
if (Files.isRegularFile(p)) {
|
||||||
|
BasicFileAttributeView view = Files.getFileAttributeView(p, BasicFileAttributeView.class);
|
||||||
|
BasicFileAttributes attrs = view.readAttributes();
|
||||||
|
if (Instant.now().minusMillis(attrs.lastModifiedTime().toMillis()).toEpochMilli() > lifetime.millis()) {
|
||||||
|
logger.log(Level.WARNING, "lifetime of " + lifetime + " exceeded, deleting " + p);
|
||||||
|
Files.delete(p);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
purge(p);
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new UncheckedIOException(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void postEvent(String key, Path file) {
|
private void postEvent(String key, Path file) {
|
||||||
String base = getBase(key);
|
String base = getBase(key);
|
||||||
String suffix = getSuffix(key);
|
String suffix = getSuffix(key);
|
||||||
|
|
Loading…
Reference in a new issue