add event type to file follow events
This commit is contained in:
parent
6689342c24
commit
2eaab6dd4d
6 changed files with 31 additions and 11 deletions
|
@ -1,7 +1,6 @@
|
||||||
package org.xbib.event.path;
|
package org.xbib.event.path;
|
||||||
|
|
||||||
import org.xbib.event.Event;
|
import org.xbib.event.Event;
|
||||||
import org.xbib.event.bus.EventBus;
|
|
||||||
import org.xbib.event.common.AbstractEventManagerService;
|
import org.xbib.event.common.AbstractEventManagerService;
|
||||||
import org.xbib.event.common.EventManager;
|
import org.xbib.event.common.EventManager;
|
||||||
import org.xbib.event.common.EventManagerService;
|
import org.xbib.event.common.EventManagerService;
|
||||||
|
@ -41,6 +40,7 @@ public class FileFollowEventManagerService extends AbstractEventManagerService i
|
||||||
String name = entry.getKey();
|
String name = entry.getKey();
|
||||||
Settings definition = entry.getValue();
|
Settings definition = entry.getValue();
|
||||||
if (definition.getAsBoolean("enabled", true)) {
|
if (definition.getAsBoolean("enabled", true)) {
|
||||||
|
String type = definition.get("type", "filefollow");
|
||||||
String baseStr = definition.get("base");
|
String baseStr = definition.get("base");
|
||||||
Objects.requireNonNull(baseStr);
|
Objects.requireNonNull(baseStr);
|
||||||
String patternStr = definition.get("pattern");
|
String patternStr = definition.get("pattern");
|
||||||
|
@ -48,10 +48,13 @@ public class FileFollowEventManagerService extends AbstractEventManagerService i
|
||||||
try {
|
try {
|
||||||
Path base = Paths.get(baseStr);
|
Path base = Paths.get(baseStr);
|
||||||
Pattern pattern = Pattern.compile(patternStr);
|
Pattern pattern = Pattern.compile(patternStr);
|
||||||
FileFollowEventService fileFollowEventService = new FileFollowEventService(this, name, base, pattern);
|
FileFollowEventService fileFollowEventService = new FileFollowEventService(this,
|
||||||
|
type, name, base, pattern);
|
||||||
Future<?> future = executorService.submit(fileFollowEventService);
|
Future<?> future = executorService.submit(fileFollowEventService);
|
||||||
eventServiceMap.put(future, fileFollowEventService);
|
eventServiceMap.put(future, fileFollowEventService);
|
||||||
logger.log(Level.INFO, "file follow service " + name + " with base " + base + " and pattern " + pattern + " added");
|
logger.log(Level.INFO, "file follow service " + name +
|
||||||
|
" with type " + type +
|
||||||
|
" with base " + base + " and pattern " + pattern + " added");
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.log(Level.SEVERE, "unable to create file follow service " +name + ", reason " + e.getMessage(), e);
|
logger.log(Level.SEVERE, "unable to create file follow service " +name + ", reason " + e.getMessage(), e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,8 @@ public class FileFollowEventService implements Callable<Integer>, Closeable {
|
||||||
|
|
||||||
private final FileFollowEventManagerService fileFollowEventManagerService;
|
private final FileFollowEventManagerService fileFollowEventManagerService;
|
||||||
|
|
||||||
|
private final String eventType;
|
||||||
|
|
||||||
private final String name;
|
private final String name;
|
||||||
|
|
||||||
private final Path base;
|
private final Path base;
|
||||||
|
@ -46,10 +48,12 @@ public class FileFollowEventService implements Callable<Integer>, Closeable {
|
||||||
private volatile boolean keepWatching;
|
private volatile boolean keepWatching;
|
||||||
|
|
||||||
public FileFollowEventService(FileFollowEventManagerService fileFollowEventManagerService,
|
public FileFollowEventService(FileFollowEventManagerService fileFollowEventManagerService,
|
||||||
|
String eventType,
|
||||||
String name,
|
String name,
|
||||||
Path base,
|
Path base,
|
||||||
Pattern pattern) throws IOException {
|
Pattern pattern) throws IOException {
|
||||||
this.fileFollowEventManagerService = fileFollowEventManagerService;
|
this.fileFollowEventManagerService = fileFollowEventManagerService;
|
||||||
|
this.eventType = eventType;
|
||||||
this.name = name;
|
this.name = name;
|
||||||
this.base = base;
|
this.base = base;
|
||||||
this.pattern = pattern;
|
this.pattern = pattern;
|
||||||
|
@ -59,7 +63,8 @@ public class FileFollowEventService implements Callable<Integer>, Closeable {
|
||||||
kinds[0] = StandardWatchEventKinds.ENTRY_MODIFY;
|
kinds[0] = StandardWatchEventKinds.ENTRY_MODIFY;
|
||||||
base.register(watchService, kinds);
|
base.register(watchService, kinds);
|
||||||
this.fileSizes = new LinkedHashMap<>();
|
this.fileSizes = new LinkedHashMap<>();
|
||||||
fillFileSizes(base, pattern);
|
// initialize the sizes of existing files, so we can safely create first event
|
||||||
|
setFileSizes(base, pattern);
|
||||||
this.keepWatching = true;
|
this.keepWatching = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -88,13 +93,14 @@ public class FileFollowEventService implements Callable<Integer>, Closeable {
|
||||||
long currentSize = p.toFile().length();
|
long currentSize = p.toFile().length();
|
||||||
fileSizes.put(p, currentSize);
|
fileSizes.put(p, currentSize);
|
||||||
String content = readRange(channel, lastSize, currentSize);
|
String content = readRange(channel, lastSize, currentSize);
|
||||||
// split content by line, this allows pattern matching without preprocessing in worker
|
// split file content by line
|
||||||
|
// this prevents swalloed events and allows pattern matching without preprocessing in worker
|
||||||
for (String line : content.split("\n")) {
|
for (String line : content.split("\n")) {
|
||||||
if (fileFollowEventManagerService.getSuspended().contains(name)) {
|
if (fileFollowEventManagerService.getSuspended().contains(name)) {
|
||||||
logger.log(Level.WARNING, name + " is suspended");
|
logger.log(Level.WARNING, name + " is suspended");
|
||||||
} else {
|
} else {
|
||||||
Event event = EventManager.eventBuilder()
|
Event event = EventManager.eventBuilder()
|
||||||
.setType("filefollow")
|
.setType(eventType)
|
||||||
.setCode(base.toString())
|
.setCode(base.toString())
|
||||||
.setPath(path)
|
.setPath(path)
|
||||||
.setMessage(line)
|
.setMessage(line)
|
||||||
|
@ -139,7 +145,7 @@ public class FileFollowEventService implements Callable<Integer>, Closeable {
|
||||||
return charBuffer.toString();
|
return charBuffer.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void fillFileSizes(Path base, Pattern pattern) throws IOException {
|
private void setFileSizes(Path base, Pattern pattern) throws IOException {
|
||||||
if (!Files.exists(base)) {
|
if (!Files.exists(base)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,11 +21,13 @@ public class FileFollowEventManagerTest {
|
||||||
TestFileFollowEventConsumer consumer = new TestFileFollowEventConsumer();
|
TestFileFollowEventConsumer consumer = new TestFileFollowEventConsumer();
|
||||||
Settings settings = Settings.settingsBuilder()
|
Settings settings = Settings.settingsBuilder()
|
||||||
.put("event.filefollow.testfilefollowevent.enabled", "true")
|
.put("event.filefollow.testfilefollowevent.enabled", "true")
|
||||||
|
.put("event.filefollow.testfilefollowevent.type", "filefollow-test")
|
||||||
.put("event.filefollow.testfilefollowevent.base", path.toString())
|
.put("event.filefollow.testfilefollowevent.base", path.toString())
|
||||||
.put("event.filefollow.testfilefollowevent.pattern", ".*")
|
.put("event.filefollow.testfilefollowevent.pattern", ".*")
|
||||||
.build();
|
.build();
|
||||||
EventManager eventManager = EventManager.builder()
|
EventManager eventManager = EventManager.builder()
|
||||||
.setSettings(settings)
|
.setSettings(settings)
|
||||||
|
.register("filefollow-test", TestFileFollowEvent.class)
|
||||||
.register(consumer)
|
.register(consumer)
|
||||||
.build();
|
.build();
|
||||||
Thread.sleep(1000L);
|
Thread.sleep(1000L);
|
||||||
|
|
|
@ -50,8 +50,8 @@ public class PathEventManagerTest {
|
||||||
TestPathEventConsumer consumer = new TestPathEventConsumer();
|
TestPathEventConsumer consumer = new TestPathEventConsumer();
|
||||||
Settings settings = Settings.settingsBuilder()
|
Settings settings = Settings.settingsBuilder()
|
||||||
.put("event.path.testpathevent.enabled", "true")
|
.put("event.path.testpathevent.enabled", "true")
|
||||||
.put("event.path.testpathevent.path", path.toString())
|
|
||||||
.put("event.path.testpathevent.type", "path-ext")
|
.put("event.path.testpathevent.type", "path-ext")
|
||||||
|
.put("event.path.testpathevent.path", path.toString())
|
||||||
.build();
|
.build();
|
||||||
EventManager eventManager = EventManager.builder()
|
EventManager eventManager = EventManager.builder()
|
||||||
.setSettings(settings)
|
.setSettings(settings)
|
||||||
|
|
|
@ -0,0 +1,10 @@
|
||||||
|
package org.xbib.event.path;
|
||||||
|
|
||||||
|
import org.xbib.event.common.EventManager;
|
||||||
|
import org.xbib.event.common.FileFollowEventImpl;
|
||||||
|
|
||||||
|
public class TestFileFollowEvent extends FileFollowEventImpl {
|
||||||
|
public TestFileFollowEvent(EventManager.EventBuilder builder) {
|
||||||
|
super(builder);
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,7 +1,6 @@
|
||||||
package org.xbib.event.path;
|
package org.xbib.event.path;
|
||||||
|
|
||||||
import org.xbib.event.EventConsumer;
|
import org.xbib.event.EventConsumer;
|
||||||
import org.xbib.event.FileFollowEvent;
|
|
||||||
import org.xbib.event.bus.AllowConcurrentEvents;
|
import org.xbib.event.bus.AllowConcurrentEvents;
|
||||||
import org.xbib.event.bus.Subscribe;
|
import org.xbib.event.bus.Subscribe;
|
||||||
|
|
||||||
|
@ -14,7 +13,7 @@ public class TestFileFollowEventConsumer implements EventConsumer {
|
||||||
|
|
||||||
@Subscribe
|
@Subscribe
|
||||||
@AllowConcurrentEvents
|
@AllowConcurrentEvents
|
||||||
void onEvent(FileFollowEvent event) {
|
void onEvent(TestFileFollowEvent event) {
|
||||||
logger.log(Level.INFO, "received filefollow event, path = " + event.getPath() + " content = " + event.getMessage());
|
logger.log(Level.INFO, "received test filefollow event, path = " + event.getPath() + " content = " + event.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue