add path event

This commit is contained in:
Jörg Prante 2023-10-24 17:12:10 +02:00
parent f494db6721
commit 7805e987ed
7 changed files with 253 additions and 12 deletions

View file

@ -1,5 +1,5 @@
group = org.xbib group = org.xbib
name = event name = event
version = 0.0.3 version = 0.0.4
org.gradle.warning.mode = ALL org.gradle.warning.mode = ALL

View file

@ -1,9 +1,9 @@
dependencies { dependencies {
testImplementation libs.junit.jupiter.api testImplementation testLibs.junit.jupiter.api
testImplementation libs.junit.jupiter.params testImplementation testLibs.junit.jupiter.params
testImplementation libs.hamcrest testImplementation testLibs.hamcrest
testRuntimeOnly libs.junit.jupiter.engine testRuntimeOnly testLibs.junit.jupiter.engine
testRuntimeOnly libs.junit.jupiter.platform.launcher testRuntimeOnly testLibs.junit.jupiter.platform.launcher
} }
test { test {

View file

@ -17,14 +17,8 @@ dependencyResolutionManagement {
libs { libs {
version('gradle', '8.4') version('gradle', '8.4')
version('groovy', '4.0.13') version('groovy', '4.0.13')
version('junit', '5.10.0')
version('datastructures', '5.0.5') version('datastructures', '5.0.5')
version('net', '4.0.0') version('net', '4.0.0')
library('junit-jupiter-api', 'org.junit.jupiter', 'junit-jupiter-api').versionRef('junit')
library('junit-jupiter-params', 'org.junit.jupiter', 'junit-jupiter-params').versionRef('junit')
library('junit-jupiter-engine', 'org.junit.jupiter', 'junit-jupiter-engine').versionRef('junit')
library('junit-jupiter-platform-launcher', 'org.junit.platform', 'junit-platform-launcher').version('1.10.0')
library('hamcrest', 'org.hamcrest', 'hamcrest-library').version('2.2')
library('net', 'org.xbib', 'net').versionRef('net') library('net', 'org.xbib', 'net').versionRef('net')
library('datastructures-common', 'org.xbib', 'datastructures-common').versionRef('datastructures') library('datastructures-common', 'org.xbib', 'datastructures-common').versionRef('datastructures')
library('datastructures-json-tiny', 'org.xbib', 'datastructures-json-tiny').versionRef('datastructures') library('datastructures-json-tiny', 'org.xbib', 'datastructures-json-tiny').versionRef('datastructures')
@ -34,5 +28,13 @@ dependencyResolutionManagement {
library('reactivestreams', 'org.reactivestreams', 'reactive-streams').version('1.0.3') library('reactivestreams', 'org.reactivestreams', 'reactive-streams').version('1.0.3')
library('rxjava3', 'io.reactivex.rxjava3', 'rxjava').version('3.0.3') library('rxjava3', 'io.reactivex.rxjava3', 'rxjava').version('3.0.3')
} }
testLibs {
version('junit', '5.10.0')
library('junit-jupiter-api', 'org.junit.jupiter', 'junit-jupiter-api').versionRef('junit')
library('junit-jupiter-params', 'org.junit.jupiter', 'junit-jupiter-params').versionRef('junit')
library('junit-jupiter-engine', 'org.junit.jupiter', 'junit-jupiter-engine').versionRef('junit')
library('junit-jupiter-platform-launcher', 'org.junit.platform', 'junit-platform-launcher').version('1.10.0')
library('hamcrest', 'org.hamcrest', 'hamcrest-library').version('2.2')
}
} }
} }

View file

@ -3,6 +3,7 @@ module org.xbib.event {
exports org.xbib.event.clock; exports org.xbib.event.clock;
exports org.xbib.event.io; exports org.xbib.event.io;
exports org.xbib.event.io.file; exports org.xbib.event.io.file;
exports org.xbib.event.io.path;
exports org.xbib.event.loop.selector; exports org.xbib.event.loop.selector;
exports org.xbib.event.loop; exports org.xbib.event.loop;
exports org.xbib.event.persistence; exports org.xbib.event.persistence;

View file

@ -0,0 +1,66 @@
package org.xbib.event.io.path;
import org.xbib.event.DefaultEvent;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.FileTime;
import java.time.Instant;
public class DefaultPathEvent extends DefaultEvent implements PathEvent {
private Path path;
private Path file;
private String suffix;
@Override
public void setPath(Path path) {
this.path = path;
}
@Override
public Path getPath() {
return path;
}
@Override
public void setFile(Path file) {
this.file = file;
}
@Override
public Path getFile() {
return file;
}
@Override
public void setSuffix(String suffix) {
this.suffix = suffix;
}
@Override
public String getSuffix() {
return suffix;
}
public void success() throws IOException {
Files.setLastModifiedTime(file, FileTime.from(Instant.now()));
Files.move(file, path.resolve(PathEventService.SUCCESS).resolve(file.getFileName()).toAbsolutePath(),
StandardCopyOption.REPLACE_EXISTING);
}
public void fail() throws IOException {
Files.setLastModifiedTime(file, FileTime.from(Instant.now()));
Files.move(file, path.resolve(PathEventService.FAIL).resolve(file.getFileName()).toAbsolutePath(),
StandardCopyOption.REPLACE_EXISTING);
}
@Override
public String toString() {
return "path=" + path + " file=" + file + " suffix=" + suffix + " map=" + getMap();
}
}

View file

@ -0,0 +1,21 @@
package org.xbib.event.io.path;
import org.xbib.event.Event;
import java.nio.file.Path;
public interface PathEvent extends Event {
void setPath(Path path);
Path getPath();
void setFile(Path file);
Path getFile();
void setSuffix(String suffix);
String getSuffix();
}

View file

@ -0,0 +1,151 @@
package org.xbib.event.io.path;
import org.xbib.datastructures.json.tiny.Json;
import org.xbib.event.bus.EventBus;
import java.io.Closeable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.ClosedWatchServiceException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.LinkedHashMap;
import java.util.concurrent.Callable;
import java.util.logging.Level;
import java.util.logging.Logger;
public class PathEventService implements Callable<Integer>, Closeable {
private static final Logger logger = Logger.getLogger(PathEventService.class.getName());
public static final String INCOMING = "incoming";
public static final String SUCCESS = "success";
public static final String FAIL = "fail";
private final EventBus eventBus;
private final Path path;
private final int maxFileSize;
;
private final Class<? extends PathEvent> pathEventClass;
private final WatchService watchService;
private int eventCount;
private volatile boolean keepWatching;
public PathEventService(EventBus eventBus,
Path path,
int maxFileSize,
Class<? extends PathEvent> pathEventClass) throws IOException {
this.eventBus = eventBus;
this.path = path;
this.maxFileSize = maxFileSize;
this.pathEventClass = pathEventClass;
drainIncoming();
this.watchService = path.getFileSystem().newWatchService();
WatchEvent.Kind<?>[] kinds = new WatchEvent.Kind[] { StandardWatchEventKinds.ENTRY_CREATE };
WatchKey watchKey = path.resolve(INCOMING).register(watchService, kinds);
keepWatching = true;
logger.log(Level.INFO, "path event service created for incoming files at " + path + " max file size = " + maxFileSize);
}
@SuppressWarnings("unchecked")
@Override
public Integer call() {
try {
logger.log(Level.INFO, "watch service running on " + path.resolve(INCOMING));
while (keepWatching && watchService != null) {
WatchKey watchKey = watchService.take();
logger.log(Level.FINE, "received a watch key " + watchKey);
for (WatchEvent<?> watchEvent : watchKey.pollEvents()) {
WatchEvent.Kind<?> kind = watchEvent.kind();
if (kind == StandardWatchEventKinds.OVERFLOW) {
continue;
}
// we sleep here a bit, to give time to the OS to complete file writing
Thread.sleep(1000L);
WatchEvent<Path> pathWatchEvent = (WatchEvent<Path>) watchEvent;
String watchEventContext = pathWatchEvent.context().toString();
Path p = path.resolve(INCOMING).resolve(watchEventContext);
logger.log(Level.FINE, "watch event " + pathWatchEvent + " context = " + watchEventContext + " path = " + p);
postEvent(watchEventContext, p);
}
watchKey.reset();
}
} catch (ClosedWatchServiceException e) {
logger.log(Level.SEVERE, "closed watch key: " + e.getMessage(), e);
} catch (InterruptedException e) {
logger.log(Level.WARNING, "interrupted");
Thread.currentThread().interrupt();
} catch (Exception e) {
logger.log(Level.SEVERE, e.getMessage(), e);
}
return eventCount;
}
@Override
public void close() throws IOException {
keepWatching = false;
if (watchService != null) {
logger.log(Level.FINE, "closing watch service " + watchService);
watchService.close();
}
}
private void drainIncoming() throws IOException {
try (DirectoryStream<Path> stream = Files.newDirectoryStream(path.resolve(INCOMING))) {
stream.forEach(path -> {
if (Files.isRegularFile(path)) {
String key = path.getFileName().toString();
logger.log(Level.INFO, "while draining found key = " + key + " path = " + path);
postEvent(key, path);
}
});
}
}
private void postEvent(String key, Path file) {
String base = getBase(key);
String suffix = getSuffix(key);
try {
long fileSize = Files.size(file);
if (fileSize < maxFileSize && "json".equals(suffix)) {
byte[] b = Files.readAllBytes(file);
String json = new String(b, StandardCharsets.UTF_8);
PathEvent event = pathEventClass.getConstructor().newInstance();
event.setKey(base);
event.setFile(file);
event.setSuffix(suffix);
event.setPath(path); // remember directory for fail() and success()
event.setMap(Json.toMap(json));
logger.log(Level.FINE, "posting new event = " + event.getClass());
eventBus.post(event);
eventCount++;
} else {
logger.log(Level.SEVERE, "skipping post event because incoming file is too large, max file size = " + maxFileSize);
}
} catch (Exception e) {
logger.log(Level.SEVERE, e.getMessage(), e);
}
}
private static String getBase(String name) {
int pos = name.lastIndexOf('.');
return pos >= 0 ? name.substring(0, pos) : name;
}
private static String getSuffix(String name) {
int pos = name.lastIndexOf('.');
return pos >= 0 ? name.substring(pos + 1) : null;
}
}