add file follow event test
This commit is contained in:
parent
7bb652dba4
commit
976e29947c
6 changed files with 67 additions and 47 deletions
|
@ -8,6 +8,8 @@ public class DefaultFileFollowEvent extends DefaultEvent implements FileFollowEv
|
||||||
|
|
||||||
private Path path;
|
private Path path;
|
||||||
|
|
||||||
|
private String content;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setPath(Path path) {
|
public void setPath(Path path) {
|
||||||
this.path = path;
|
this.path = path;
|
||||||
|
@ -17,4 +19,14 @@ public class DefaultFileFollowEvent extends DefaultEvent implements FileFollowEv
|
||||||
public Path getPath() {
|
public Path getPath() {
|
||||||
return path;
|
return path;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setContent(String content) {
|
||||||
|
this.content = content;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getContent() {
|
||||||
|
return content;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,4 +9,8 @@ public interface FileFollowEvent extends Event {
|
||||||
void setPath(Path path);
|
void setPath(Path path);
|
||||||
|
|
||||||
Path getPath();
|
Path getPath();
|
||||||
|
|
||||||
|
void setContent(String content);
|
||||||
|
|
||||||
|
String getContent();
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,7 +25,7 @@ public class FileFollowEventManager {
|
||||||
ExecutorService executorService,
|
ExecutorService executorService,
|
||||||
ClassLoader classLoader) {
|
ClassLoader classLoader) {
|
||||||
this.eventServiceMap = new LinkedHashMap<>();
|
this.eventServiceMap = new LinkedHashMap<>();
|
||||||
for (Map.Entry<String, Settings> followfiles : settings.getGroups("filefollow").entrySet()) {
|
for (Map.Entry<String, Settings> followfiles : settings.getGroups("event.filefollow").entrySet()) {
|
||||||
Settings definition = followfiles.getValue();
|
Settings definition = followfiles.getValue();
|
||||||
String baseStr = definition.get("base");
|
String baseStr = definition.get("base");
|
||||||
String patternStr = definition.get("pattern");
|
String patternStr = definition.get("pattern");
|
||||||
|
|
|
@ -5,6 +5,7 @@ import org.xbib.settings.Settings;
|
||||||
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.io.UncheckedIOException;
|
||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.CharBuffer;
|
import java.nio.CharBuffer;
|
||||||
import java.nio.channels.SeekableByteChannel;
|
import java.nio.channels.SeekableByteChannel;
|
||||||
|
@ -24,13 +25,12 @@ import java.util.logging.Level;
|
||||||
import java.util.logging.Logger;
|
import java.util.logging.Logger;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
public class FileFollowEventService implements Callable<Integer>, Closeable {
|
public class FileFollowEventService implements Callable<Integer>, Closeable {
|
||||||
|
|
||||||
private static final Logger logger = Logger.getLogger(FileFollowEventService.class.getName());
|
private static final Logger logger = Logger.getLogger(FileFollowEventService.class.getName());
|
||||||
|
|
||||||
private final Settings settings;
|
|
||||||
|
|
||||||
private final EventBus eventBus;
|
private final EventBus eventBus;
|
||||||
|
|
||||||
private final Path base;
|
private final Path base;
|
||||||
|
@ -41,12 +41,8 @@ public class FileFollowEventService implements Callable<Integer>, Closeable {
|
||||||
|
|
||||||
private final WatchService watchService;
|
private final WatchService watchService;
|
||||||
|
|
||||||
private final WatchKey watchKey;
|
|
||||||
|
|
||||||
private final Map<Path, Long> fileSizes;
|
private final Map<Path, Long> fileSizes;
|
||||||
|
|
||||||
private int eventCount;
|
|
||||||
|
|
||||||
private volatile boolean keepWatching;
|
private volatile boolean keepWatching;
|
||||||
|
|
||||||
public FileFollowEventService(Settings settings,
|
public FileFollowEventService(Settings settings,
|
||||||
|
@ -54,7 +50,6 @@ public class FileFollowEventService implements Callable<Integer>, Closeable {
|
||||||
Path base,
|
Path base,
|
||||||
Pattern pattern,
|
Pattern pattern,
|
||||||
Class<? extends FileFollowEvent> eventClass) throws IOException {
|
Class<? extends FileFollowEvent> eventClass) throws IOException {
|
||||||
this.settings = settings;
|
|
||||||
this.eventBus = eventBus;
|
this.eventBus = eventBus;
|
||||||
this.base = base;
|
this.base = base;
|
||||||
this.pattern = pattern;
|
this.pattern = pattern;
|
||||||
|
@ -63,9 +58,9 @@ public class FileFollowEventService implements Callable<Integer>, Closeable {
|
||||||
this.watchService = fileSystem.newWatchService();
|
this.watchService = fileSystem.newWatchService();
|
||||||
WatchEvent.Kind<?>[] kinds = new WatchEvent.Kind<?>[1];
|
WatchEvent.Kind<?>[] kinds = new WatchEvent.Kind<?>[1];
|
||||||
kinds[0] = StandardWatchEventKinds.ENTRY_MODIFY;
|
kinds[0] = StandardWatchEventKinds.ENTRY_MODIFY;
|
||||||
this.watchKey = base.register(watchService, kinds);
|
base.register(watchService, kinds);
|
||||||
// limit file size memory to 32 files
|
this.fileSizes = new LinkedHashMap<>();
|
||||||
this.fileSizes = new LimitedMap<>(32);
|
fillFileSizes(base, pattern);
|
||||||
this.keepWatching = true;
|
this.keepWatching = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -85,6 +80,7 @@ public class FileFollowEventService implements Callable<Integer>, Closeable {
|
||||||
Path path = pathWatchEvent.context();
|
Path path = pathWatchEvent.context();
|
||||||
Matcher matcher = pattern.matcher(path.toString());
|
Matcher matcher = pattern.matcher(path.toString());
|
||||||
if (!matcher.matches()) {
|
if (!matcher.matches()) {
|
||||||
|
logger.log(Level.FINE, "no match of " + path + " to pattern " + pattern);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
Path p = base.resolve(path);
|
Path p = base.resolve(path);
|
||||||
|
@ -92,22 +88,14 @@ public class FileFollowEventService implements Callable<Integer>, Closeable {
|
||||||
long lastSize = fileSizes.getOrDefault(p, 0L);
|
long lastSize = fileSizes.getOrDefault(p, 0L);
|
||||||
long currentSize = p.toFile().length();
|
long currentSize = p.toFile().length();
|
||||||
fileSizes.put(p, currentSize);
|
fileSizes.put(p, currentSize);
|
||||||
// We have no idea where to start reading if this is the first time.
|
String content = readRange(channel, lastSize, currentSize);
|
||||||
// Avoid reading the whole file at first time, read only real diff.
|
// split content by line, this allows pattern matching without preprocessing in worker
|
||||||
// This means first event is swallowed!
|
for (String line : content.split("\n")) {
|
||||||
if (lastSize > 0L) {
|
FileFollowEvent event = eventClass.getDeclaredConstructor().newInstance();
|
||||||
String content = readRange(channel, lastSize, currentSize);
|
event.setKey(base.toString());
|
||||||
// split content by line, this allows pattern matching without preprocessing in worker
|
event.setPath(path);
|
||||||
for (String line : content.split("\n")) {
|
event.setContent(line);
|
||||||
FileFollowEvent event = eventClass.getDeclaredConstructor().newInstance();
|
eventBus.post(event);
|
||||||
event.setKey(path.toString());
|
|
||||||
event.setPath(base);
|
|
||||||
event.setMap(new LinkedHashMap<>());
|
|
||||||
event.getMap().putAll(settings.getAsStructuredMap());
|
|
||||||
event.getMap().put("content", line);
|
|
||||||
eventBus.post(event);
|
|
||||||
eventCount++;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -121,7 +109,7 @@ public class FileFollowEventService implements Callable<Integer>, Closeable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return eventCount;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -138,26 +126,29 @@ public class FileFollowEventService implements Callable<Integer>, Closeable {
|
||||||
fileChannel.position(from);
|
fileChannel.position(from);
|
||||||
int numRead = fileChannel.read(byteBuffer);
|
int numRead = fileChannel.read(byteBuffer);
|
||||||
byteBuffer.flip();
|
byteBuffer.flip();
|
||||||
CharBuffer chb = StandardCharsets.UTF_8.decode(byteBuffer);
|
CharBuffer charBuffer = StandardCharsets.UTF_8.decode(byteBuffer);
|
||||||
byteBuffer.clear();
|
byteBuffer.clear();
|
||||||
if (numRead <= 0) {
|
if (numRead <= 0) {
|
||||||
throw new IOException("numRead less or equal to 0");
|
throw new IOException("numRead less or equal to 0");
|
||||||
}
|
}
|
||||||
return chb.toString();
|
return charBuffer.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class LimitedMap<K, V> extends LinkedHashMap<K, V> {
|
private void fillFileSizes(Path base, Pattern pattern) throws IOException {
|
||||||
|
if (!Files.exists(base)) {
|
||||||
private final int n;
|
return;
|
||||||
|
|
||||||
LimitedMap(int n) {
|
|
||||||
super();
|
|
||||||
this.n = n;
|
|
||||||
}
|
}
|
||||||
|
try (Stream<Path> path = Files.walk(base)) {
|
||||||
@Override
|
path.forEach(p -> {
|
||||||
protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
|
Matcher matcher = pattern.matcher(p.toString());
|
||||||
return size() > n;
|
if (matcher.matches()) {
|
||||||
|
try {
|
||||||
|
fileSizes.put(p, Files.size(p));
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new UncheckedIOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,27 +6,40 @@ import org.xbib.event.timer.TestTimerEventConsumer;
|
||||||
import org.xbib.event.timer.TimerEventManager;
|
import org.xbib.event.timer.TimerEventManager;
|
||||||
import org.xbib.settings.Settings;
|
import org.xbib.settings.Settings;
|
||||||
|
|
||||||
|
import java.io.BufferedWriter;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
import java.time.Instant;
|
import java.time.Instant;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.logging.Level;
|
||||||
|
import java.util.logging.Logger;
|
||||||
|
|
||||||
public class FileFollowEventManagerTest {
|
public class FileFollowEventManagerTest {
|
||||||
|
|
||||||
|
private static final Logger logger = Logger.getLogger(FileFollowEventManagerTest.class.getName());
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFileFollowEvents() throws IOException, InterruptedException {
|
public void testFileFollowEvents() throws IOException, InterruptedException {
|
||||||
Path path = Files.createTempDirectory("testfilefollow");
|
Path path = Files.createTempDirectory("testfilefollow");
|
||||||
TestFileFollowEventConsumer consumer = new TestFileFollowEventConsumer();
|
TestFileFollowEventConsumer consumer = new TestFileFollowEventConsumer();
|
||||||
EventManager.register(consumer);
|
EventManager.register(consumer);
|
||||||
Settings settings = Settings.settingsBuilder()
|
Settings settings = Settings.settingsBuilder()
|
||||||
.put("event.timer.testfilefollowevent.enabled", "true")
|
.put("event.filefollow.testfilefollowevent.enabled", "true")
|
||||||
.put("event.timer.testfilefollowevent.class", "org.xbib.event.io.file.TestFileFollowEvent")
|
.put("event.filefollow.testfilefollowevent.class", "org.xbib.event.io.file.TestFileFollowEvent")
|
||||||
.put("event.timer.testfilefollowevent.path", path.toString())
|
.put("event.filefollow.testfilefollowevent.base", path.toString())
|
||||||
|
.put("event.filefollow.testfilefollowevent.pattern", ".*")
|
||||||
.build();
|
.build();
|
||||||
EventManager eventManager = EventManager.newEventManager(settings);
|
EventManager eventManager = EventManager.newEventManager(settings);
|
||||||
FileFollowEventManager fileFolloeEventManager = eventManager.getFileFollowEventManager()
|
FileFollowEventManager fileFolloeEventManager = eventManager.getFileFollowEventManager();
|
||||||
Thread.sleep(10000L);
|
Thread.sleep(5000L);
|
||||||
|
try (BufferedWriter bufferedWriter = Files.newBufferedWriter(path.resolve("test.txt"))) {
|
||||||
|
bufferedWriter.write("Hello");
|
||||||
|
logger.log(Level.INFO, "Hello written");
|
||||||
|
}
|
||||||
|
Thread.sleep(5000L);
|
||||||
fileFolloeEventManager.close();
|
fileFolloeEventManager.close();
|
||||||
|
Files.delete(path.resolve("test.txt"));
|
||||||
|
Files.delete(path);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,6 +16,6 @@ public class TestFileFollowEventConsumer implements EventConsumer {
|
||||||
@Subscribe
|
@Subscribe
|
||||||
@AllowConcurrentEvents
|
@AllowConcurrentEvents
|
||||||
void onEvent(TestFileFollowEvent event) {
|
void onEvent(TestFileFollowEvent event) {
|
||||||
logger.log(Level.INFO, "received filefollw event path = " + event.getPath());
|
logger.log(Level.INFO, "received filefollw event path = " + event.getPath() + " content = " + event.getContent());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue