working on path events

main
Jörg Prante 5 months ago
parent ef9528c1ad
commit 65d6f33edb

@ -13,7 +13,6 @@ import java.util.ServiceLoader;
import org.xbib.datastructures.json.tiny.Json; import org.xbib.datastructures.json.tiny.Json;
import org.xbib.event.Event; import org.xbib.event.Event;
import org.xbib.event.EventConsumer; import org.xbib.event.EventConsumer;
import org.xbib.event.FileFollowEvent;
import org.xbib.event.Listener; import org.xbib.event.Listener;
import org.xbib.event.Payload; import org.xbib.event.Payload;
import org.xbib.event.bus.AsyncEventBus; import org.xbib.event.bus.AsyncEventBus;
@ -424,9 +423,13 @@ public final class EventManager {
this.path = path; this.path = path;
base = getBase(path); base = getBase(path);
suffix = getSuffix(path); suffix = getSuffix(path);
fileSize = Files.size(path); if (Files.exists(path)) {
if (maxFileSize != -1L && fileSize > maxFileSize) { fileSize = Files.size(path);
throw new IOException("file size too large"); if (maxFileSize != -1L && fileSize > maxFileSize) {
throw new IOException("file size too large");
}
} else {
fileSize = -1L;
} }
return this; return this;
} }

@ -32,8 +32,6 @@ public class PathEventManagerService implements EventManagerService {
private ExecutorService executorService; private ExecutorService executorService;
private Path path;
private Map<Future<?>, PathEventService> eventServiceMap; private Map<Future<?>, PathEventService> eventServiceMap;
private List<String> suspendedQueues; private List<String> suspendedQueues;
@ -41,24 +39,21 @@ public class PathEventManagerService implements EventManagerService {
public PathEventManagerService() { public PathEventManagerService() {
} }
@SuppressWarnings("unchecked")
@Override @Override
public PathEventManagerService init(EventManager eventManager) { public PathEventManagerService init(EventManager eventManager) {
Settings settings = eventManager.getSettings(); Settings settings = eventManager.getSettings();
ClassLoader classLoader = eventManager.getClassLoader();
this.eventBus = eventManager.getEventBus(); this.eventBus = eventManager.getEventBus();
this.executorService = eventManager.getExecutorService(); this.executorService = eventManager.getExecutorService();
this.eventServiceMap = new LinkedHashMap<>(); this.eventServiceMap = new LinkedHashMap<>();
this.suspendedQueues = new ArrayList<>(); this.suspendedQueues = new ArrayList<>();
this.path = Paths.get(settings.get("event.pathevent.base", "/var/tmp/pathevent"));
for (Map.Entry<String, Settings> entry : settings.getGroups("event.path").entrySet()) { for (Map.Entry<String, Settings> entry : settings.getGroups("event.path").entrySet()) {
try { try {
String name = entry.getKey(); String name = entry.getKey();
Settings definition = entry.getValue(); Settings definition = entry.getValue();
if (definition.getAsBoolean("enabled", true)) { if (definition.getAsBoolean("enabled", true)) {
Path path = Paths.get(definition.get("path", "/var/tmp/" + name));
TimeValue lifetime = definition.getAsTime("lifetime", TimeValue.timeValueHours(72)); TimeValue lifetime = definition.getAsTime("lifetime", TimeValue.timeValueHours(72));
Path p = path.resolve(name); createPathEventService(name, path, lifetime);
createPathEventService(name, p, lifetime);
} else { } else {
logger.log(Level.WARNING, "path servive definition not enabled in configuration"); logger.log(Level.WARNING, "path servive definition not enabled in configuration");
} }
@ -109,60 +104,56 @@ public class PathEventManagerService implements EventManagerService {
suspendedQueues.remove(queue); suspendedQueues.remove(queue);
} }
public boolean put(String queue, String key, Map<String,Object> map) throws IOException { public boolean put(Path path, String key, Map<String,Object> map) throws IOException {
return put(queue, key, ".json", Json.toString(map)); return put(path, key, ".json", Json.toString(map));
} }
public boolean putIfNotExists(String queue, String key, Map<String,Object> map) throws IOException { public boolean putIfNotExists(Path path, String key, Map<String,Object> map) throws IOException {
if (!exists(queue, key, ".json")) { if (!exists(path, key, ".json")) {
return put(queue, key, ".json", Json.toString(map)); return put(path, key, ".json", Json.toString(map));
} else { } else {
return false; return false;
} }
} }
public boolean put(String queue, String key, String suffix, String string) throws IOException { public boolean put(Path path, String key, String suffix, String string) throws IOException {
String keyFileName = key + suffix; String keyFileName = key + suffix;
Path queuePath = path.resolve(queue); if (Files.exists(path.resolve(Event.INCOMING).resolve(keyFileName)) ||
if (Files.exists(queuePath.resolve(Event.INCOMING).resolve(keyFileName)) || Files.exists(path.resolve(Event.SUCCESS).resolve(keyFileName))) {
Files.exists(queuePath.resolve(Event.SUCCESS).resolve(keyFileName))) {
logger.log(Level.WARNING, "key " + key + " already exists"); logger.log(Level.WARNING, "key " + key + " already exists");
return false; return false;
} }
Path p = queuePath.resolve(Event.INCOMING).resolve(keyFileName); Path p = path.resolve(Event.INCOMING).resolve(keyFileName);
try (Writer writer = Files.newBufferedWriter(p)) { try (Writer writer = Files.newBufferedWriter(p)) {
writer.write(string); writer.write(string);
} }
// obligatory purge. This is hacky. // obligatory purge. This is hacky.
eventServiceMap.forEach((k, v) -> { eventServiceMap.forEach((k, v) -> {
if (v.getName().equals(queue)) { try {
try { v.purge();
v.purge(); } catch (IOException e) {
} catch (IOException e) { throw new UncheckedIOException(e);
throw new UncheckedIOException(e);
}
} }
}); });
return true; return true;
} }
public boolean exists(String queue, String key, String suffix) { public boolean exists(Path path, String key, String suffix) {
String keyFileName = key + suffix; String keyFileName = key + suffix;
Path queuePath = path.resolve(queue); return Files.exists(path.resolve(Event.INCOMING).resolve(keyFileName)) ||
return Files.exists(queuePath.resolve(Event.INCOMING).resolve(keyFileName)) || Files.exists(path.resolve(Event.SUCCESS).resolve(keyFileName));
Files.exists(queuePath.resolve(Event.SUCCESS).resolve(keyFileName));
} }
public long sizeOfIncoming(String queue) throws IOException { public long sizeOfIncoming(Path path) throws IOException {
return sizeOf(path.resolve(queue).resolve(Event.INCOMING)); return sizeOf(path.resolve(Event.INCOMING));
} }
public long sizeOfSuccess(String queue) throws IOException { public long sizeOfSuccess(Path path) throws IOException {
return sizeOf(path.resolve(queue).resolve(Event.SUCCESS)); return sizeOf(path.resolve(Event.SUCCESS));
} }
public long sizeOfFail(String queue) throws IOException { public long sizeOfFail(Path path) throws IOException {
return sizeOf(path.resolve(queue).resolve(Event.FAIL)); return sizeOf(path.resolve(Event.FAIL));
} }
public static long sizeOf(Path path) throws IOException { public static long sizeOf(Path path) throws IOException {

@ -16,13 +16,14 @@ import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
public class GenericEventManagerTest { public class GenericEventManagerTest {
private static final Logger logger = Logger.getLogger(GenericEventManagerTest.class.getName()); private static final Logger logger = Logger.getLogger(GenericEventManagerTest.class.getName());
@Test @Test
void testSimpleGenericEvent() { void testSimpleGenericEvent() throws InterruptedException {
TestEventConsumer consumer = new TestEventConsumer(); TestEventConsumer consumer = new TestEventConsumer();
EventManager eventManager = EventManager.builder() EventManager eventManager = EventManager.builder()
.register(consumer) .register(consumer)
@ -32,10 +33,11 @@ public class GenericEventManagerTest {
.setListener(e -> logger.log(Level.INFO, "received event " + e)) .setListener(e -> logger.log(Level.INFO, "received event " + e))
.build(); .build();
eventManager.getGenericEventManagerService().post(event); eventManager.getGenericEventManagerService().post(event);
// we must wait for a certain time because we do not use a future
Thread.sleep(500L);
assertEquals(1, consumer.getCount()); assertEquals(1, consumer.getCount());
} }
@Test @Test
void testGenericEventWithWaitForSubscriber() throws InterruptedException, ExecutionException, TimeoutException { void testGenericEventWithWaitForSubscriber() throws InterruptedException, ExecutionException, TimeoutException {
TestEventConsumer consumer = new TestEventConsumer(); TestEventConsumer consumer = new TestEventConsumer();
@ -52,6 +54,7 @@ public class GenericEventManagerTest {
.build(); .build();
eventManager.getGenericEventManagerService().post(event); eventManager.getGenericEventManagerService().post(event);
Event e = future.get(1000L, TimeUnit.MILLISECONDS); Event e = future.get(1000L, TimeUnit.MILLISECONDS);
assertNotNull(e);
} }
@Test @Test
@ -76,8 +79,9 @@ public class GenericEventManagerTest {
eventManager.getGenericEventManagerService().post((GenericEventImpl) event, future); eventManager.getGenericEventManagerService().post((GenericEventImpl) event, future);
Event e = future.get(5L, TimeUnit.SECONDS); Event e = future.get(5L, TimeUnit.SECONDS);
if (e != null) { if (e != null) {
logger.log(Level.INFO, "the event " + e + " was received by all consumers, continuing"); logger.log(Level.INFO, "the event " + e + " was received by all consumers");
} }
assertNotNull(e);
} }
private static class TestEventConsumer implements EventConsumer { private static class TestEventConsumer implements EventConsumer {

@ -28,13 +28,14 @@ public class FileFollowEventManagerTest {
.setSettings(settings) .setSettings(settings)
.register(consumer) .register(consumer)
.build(); .build();
Thread.sleep(5000L); Thread.sleep(1000L);
try (BufferedWriter bufferedWriter = Files.newBufferedWriter(path.resolve("test.txt"))) { Path testTxt = path.resolve("test.txt");
try (BufferedWriter bufferedWriter = Files.newBufferedWriter(testTxt)) {
bufferedWriter.write("Hello"); bufferedWriter.write("Hello");
logger.log(Level.INFO, "Hello written"); logger.log(Level.INFO, "Hello written");
} }
Thread.sleep(5000L); Thread.sleep(1000L);
Files.delete(path.resolve("test.txt")); Files.delete(testTxt);
Files.delete(path); Files.delete(path);
eventManager.close(); eventManager.close();
} }

@ -0,0 +1,41 @@
package org.xbib.event.path;
import org.junit.jupiter.api.Test;
import org.xbib.event.common.EventManager;
import org.xbib.settings.Settings;
import java.io.BufferedWriter;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.logging.Level;
import java.util.logging.Logger;
public class PathEventManagerTest {
private static final Logger logger = Logger.getLogger(PathEventManagerTest.class.getName());
@Test
public void testPathEvents() throws IOException, InterruptedException {
Path path = Files.createTempDirectory("testpath");
TestPathEventConsumer consumer = new TestPathEventConsumer();
Settings settings = Settings.settingsBuilder()
.put("event.path.testpathevent.enabled", "true")
.put("event.path.testpathevent.path", path.toString())
.build();
EventManager eventManager = EventManager.builder()
.setSettings(settings)
.register(consumer)
.build();
Thread.sleep(1000L);
Path testTxt = path.resolve("incoming").resolve("test.txt");
try (BufferedWriter bufferedWriter = Files.newBufferedWriter(testTxt)) {
bufferedWriter.write("Hello");
logger.log(Level.INFO, "Hello written");
}
Thread.sleep(1000L);
Files.delete(testTxt);
Files.delete(path);
eventManager.close();
}
}

@ -0,0 +1,20 @@
package org.xbib.event.path;
import org.xbib.event.EventConsumer;
import org.xbib.event.PathEvent;
import org.xbib.event.bus.AllowConcurrentEvents;
import org.xbib.event.bus.Subscribe;
import java.util.logging.Level;
import java.util.logging.Logger;
public class TestPathEventConsumer implements EventConsumer {
private static final Logger logger = Logger.getLogger(TestPathEventConsumer.class.getName());
@Subscribe
@AllowConcurrentEvents
void onEvent(PathEvent event) {
logger.log(Level.INFO, "received path event, path = " + event.getPath() + " content = " + event.getMessage());
}
}
Loading…
Cancel
Save