From 7805e987ed8593fd8c01d79d041bbfba8beb4927 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Prante?= Date: Tue, 24 Oct 2023 17:12:10 +0200 Subject: [PATCH] add path event --- gradle.properties | 2 +- gradle/test/junit5.gradle | 10 +- settings.gradle | 14 +- src/main/java/module-info.java | 1 + .../xbib/event/io/path/DefaultPathEvent.java | 66 ++++++++ .../org/xbib/event/io/path/PathEvent.java | 21 +++ .../xbib/event/io/path/PathEventService.java | 151 ++++++++++++++++++ 7 files changed, 253 insertions(+), 12 deletions(-) create mode 100644 src/main/java/org/xbib/event/io/path/DefaultPathEvent.java create mode 100644 src/main/java/org/xbib/event/io/path/PathEvent.java create mode 100644 src/main/java/org/xbib/event/io/path/PathEventService.java diff --git a/gradle.properties b/gradle.properties index e810c66..607456a 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,5 +1,5 @@ group = org.xbib name = event -version = 0.0.3 +version = 0.0.4 org.gradle.warning.mode = ALL diff --git a/gradle/test/junit5.gradle b/gradle/test/junit5.gradle index e356f30..6cace6f 100644 --- a/gradle/test/junit5.gradle +++ b/gradle/test/junit5.gradle @@ -1,9 +1,9 @@ dependencies { - testImplementation libs.junit.jupiter.api - testImplementation libs.junit.jupiter.params - testImplementation libs.hamcrest - testRuntimeOnly libs.junit.jupiter.engine - testRuntimeOnly libs.junit.jupiter.platform.launcher + testImplementation testLibs.junit.jupiter.api + testImplementation testLibs.junit.jupiter.params + testImplementation testLibs.hamcrest + testRuntimeOnly testLibs.junit.jupiter.engine + testRuntimeOnly testLibs.junit.jupiter.platform.launcher } test { diff --git a/settings.gradle b/settings.gradle index 3c48ee8..563fc0d 100644 --- a/settings.gradle +++ b/settings.gradle @@ -17,14 +17,8 @@ dependencyResolutionManagement { libs { version('gradle', '8.4') version('groovy', '4.0.13') - version('junit', '5.10.0') version('datastructures', '5.0.5') version('net', '4.0.0') - library('junit-jupiter-api', 'org.junit.jupiter', 'junit-jupiter-api').versionRef('junit') - library('junit-jupiter-params', 'org.junit.jupiter', 'junit-jupiter-params').versionRef('junit') - library('junit-jupiter-engine', 'org.junit.jupiter', 'junit-jupiter-engine').versionRef('junit') - library('junit-jupiter-platform-launcher', 'org.junit.platform', 'junit-platform-launcher').version('1.10.0') - library('hamcrest', 'org.hamcrest', 'hamcrest-library').version('2.2') library('net', 'org.xbib', 'net').versionRef('net') library('datastructures-common', 'org.xbib', 'datastructures-common').versionRef('datastructures') library('datastructures-json-tiny', 'org.xbib', 'datastructures-json-tiny').versionRef('datastructures') @@ -34,5 +28,13 @@ dependencyResolutionManagement { library('reactivestreams', 'org.reactivestreams', 'reactive-streams').version('1.0.3') library('rxjava3', 'io.reactivex.rxjava3', 'rxjava').version('3.0.3') } + testLibs { + version('junit', '5.10.0') + library('junit-jupiter-api', 'org.junit.jupiter', 'junit-jupiter-api').versionRef('junit') + library('junit-jupiter-params', 'org.junit.jupiter', 'junit-jupiter-params').versionRef('junit') + library('junit-jupiter-engine', 'org.junit.jupiter', 'junit-jupiter-engine').versionRef('junit') + library('junit-jupiter-platform-launcher', 'org.junit.platform', 'junit-platform-launcher').version('1.10.0') + library('hamcrest', 'org.hamcrest', 'hamcrest-library').version('2.2') + } } } diff --git a/src/main/java/module-info.java b/src/main/java/module-info.java index db430b8..a3c4e4e 100644 --- a/src/main/java/module-info.java +++ b/src/main/java/module-info.java @@ -3,6 +3,7 @@ module org.xbib.event { exports org.xbib.event.clock; exports org.xbib.event.io; exports org.xbib.event.io.file; + exports org.xbib.event.io.path; exports org.xbib.event.loop.selector; exports org.xbib.event.loop; exports org.xbib.event.persistence; diff --git a/src/main/java/org/xbib/event/io/path/DefaultPathEvent.java b/src/main/java/org/xbib/event/io/path/DefaultPathEvent.java new file mode 100644 index 0000000..383951d --- /dev/null +++ b/src/main/java/org/xbib/event/io/path/DefaultPathEvent.java @@ -0,0 +1,66 @@ +package org.xbib.event.io.path; + +import org.xbib.event.DefaultEvent; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.nio.file.attribute.FileTime; +import java.time.Instant; + +public class DefaultPathEvent extends DefaultEvent implements PathEvent { + + private Path path; + + private Path file; + + private String suffix; + + @Override + public void setPath(Path path) { + this.path = path; + } + + @Override + public Path getPath() { + return path; + } + + @Override + public void setFile(Path file) { + this.file = file; + } + + @Override + public Path getFile() { + return file; + } + + @Override + public void setSuffix(String suffix) { + this.suffix = suffix; + } + + @Override + public String getSuffix() { + return suffix; + } + + public void success() throws IOException { + Files.setLastModifiedTime(file, FileTime.from(Instant.now())); + Files.move(file, path.resolve(PathEventService.SUCCESS).resolve(file.getFileName()).toAbsolutePath(), + StandardCopyOption.REPLACE_EXISTING); + } + + public void fail() throws IOException { + Files.setLastModifiedTime(file, FileTime.from(Instant.now())); + Files.move(file, path.resolve(PathEventService.FAIL).resolve(file.getFileName()).toAbsolutePath(), + StandardCopyOption.REPLACE_EXISTING); + } + + @Override + public String toString() { + return "path=" + path + " file=" + file + " suffix=" + suffix + " map=" + getMap(); + } +} diff --git a/src/main/java/org/xbib/event/io/path/PathEvent.java b/src/main/java/org/xbib/event/io/path/PathEvent.java new file mode 100644 index 0000000..fb73693 --- /dev/null +++ b/src/main/java/org/xbib/event/io/path/PathEvent.java @@ -0,0 +1,21 @@ +package org.xbib.event.io.path; + +import org.xbib.event.Event; + +import java.nio.file.Path; + +public interface PathEvent extends Event { + + void setPath(Path path); + + Path getPath(); + + void setFile(Path file); + + Path getFile(); + + void setSuffix(String suffix); + + String getSuffix(); + +} diff --git a/src/main/java/org/xbib/event/io/path/PathEventService.java b/src/main/java/org/xbib/event/io/path/PathEventService.java new file mode 100644 index 0000000..1925cc0 --- /dev/null +++ b/src/main/java/org/xbib/event/io/path/PathEventService.java @@ -0,0 +1,151 @@ +package org.xbib.event.io.path; + +import org.xbib.datastructures.json.tiny.Json; +import org.xbib.event.bus.EventBus; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.ClosedWatchServiceException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardWatchEventKinds; +import java.nio.file.WatchEvent; +import java.nio.file.WatchKey; +import java.nio.file.WatchService; +import java.util.LinkedHashMap; +import java.util.concurrent.Callable; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class PathEventService implements Callable, Closeable { + + private static final Logger logger = Logger.getLogger(PathEventService.class.getName()); + + public static final String INCOMING = "incoming"; + + public static final String SUCCESS = "success"; + + public static final String FAIL = "fail"; + + private final EventBus eventBus; + + private final Path path; + + private final int maxFileSize; +; + private final Class pathEventClass; + + private final WatchService watchService; + + private int eventCount; + + private volatile boolean keepWatching; + + public PathEventService(EventBus eventBus, + Path path, + int maxFileSize, + Class pathEventClass) throws IOException { + this.eventBus = eventBus; + this.path = path; + this.maxFileSize = maxFileSize; + this.pathEventClass = pathEventClass; + drainIncoming(); + this.watchService = path.getFileSystem().newWatchService(); + WatchEvent.Kind[] kinds = new WatchEvent.Kind[] { StandardWatchEventKinds.ENTRY_CREATE }; + WatchKey watchKey = path.resolve(INCOMING).register(watchService, kinds); + keepWatching = true; + logger.log(Level.INFO, "path event service created for incoming files at " + path + " max file size = " + maxFileSize); + } + + @SuppressWarnings("unchecked") + @Override + public Integer call() { + try { + logger.log(Level.INFO, "watch service running on " + path.resolve(INCOMING)); + while (keepWatching && watchService != null) { + WatchKey watchKey = watchService.take(); + logger.log(Level.FINE, "received a watch key " + watchKey); + for (WatchEvent watchEvent : watchKey.pollEvents()) { + WatchEvent.Kind kind = watchEvent.kind(); + if (kind == StandardWatchEventKinds.OVERFLOW) { + continue; + } + // we sleep here a bit, to give time to the OS to complete file writing + Thread.sleep(1000L); + WatchEvent pathWatchEvent = (WatchEvent) watchEvent; + String watchEventContext = pathWatchEvent.context().toString(); + Path p = path.resolve(INCOMING).resolve(watchEventContext); + logger.log(Level.FINE, "watch event " + pathWatchEvent + " context = " + watchEventContext + " path = " + p); + postEvent(watchEventContext, p); + } + watchKey.reset(); + } + } catch (ClosedWatchServiceException e) { + logger.log(Level.SEVERE, "closed watch key: " + e.getMessage(), e); + } catch (InterruptedException e) { + logger.log(Level.WARNING, "interrupted"); + Thread.currentThread().interrupt(); + } catch (Exception e) { + logger.log(Level.SEVERE, e.getMessage(), e); + } + return eventCount; + } + + @Override + public void close() throws IOException { + keepWatching = false; + if (watchService != null) { + logger.log(Level.FINE, "closing watch service " + watchService); + watchService.close(); + } + } + + private void drainIncoming() throws IOException { + try (DirectoryStream stream = Files.newDirectoryStream(path.resolve(INCOMING))) { + stream.forEach(path -> { + if (Files.isRegularFile(path)) { + String key = path.getFileName().toString(); + logger.log(Level.INFO, "while draining found key = " + key + " path = " + path); + postEvent(key, path); + } + }); + } + } + + private void postEvent(String key, Path file) { + String base = getBase(key); + String suffix = getSuffix(key); + try { + long fileSize = Files.size(file); + if (fileSize < maxFileSize && "json".equals(suffix)) { + byte[] b = Files.readAllBytes(file); + String json = new String(b, StandardCharsets.UTF_8); + PathEvent event = pathEventClass.getConstructor().newInstance(); + event.setKey(base); + event.setFile(file); + event.setSuffix(suffix); + event.setPath(path); // remember directory for fail() and success() + event.setMap(Json.toMap(json)); + logger.log(Level.FINE, "posting new event = " + event.getClass()); + eventBus.post(event); + eventCount++; + } else { + logger.log(Level.SEVERE, "skipping post event because incoming file is too large, max file size = " + maxFileSize); + } + } catch (Exception e) { + logger.log(Level.SEVERE, e.getMessage(), e); + } + } + + private static String getBase(String name) { + int pos = name.lastIndexOf('.'); + return pos >= 0 ? name.substring(0, pos) : name; + } + + private static String getSuffix(String name) { + int pos = name.lastIndexOf('.'); + return pos >= 0 ? name.substring(pos + 1) : null; + } +}