add clock event type
This commit is contained in:
parent
0b78c32b91
commit
6689342c24
9 changed files with 51 additions and 67 deletions
|
@ -16,9 +16,7 @@ public interface Event {
|
||||||
|
|
||||||
String getMessage();
|
String getMessage();
|
||||||
|
|
||||||
Instant getCreated();
|
Instant getInstant();
|
||||||
|
|
||||||
Instant getScheduledFor();
|
|
||||||
|
|
||||||
Payload getPayload();
|
Payload getPayload();
|
||||||
|
|
||||||
|
|
|
@ -46,17 +46,20 @@ public class ClockEventManagerService extends AbstractEventManagerService implem
|
||||||
Settings entrySettings = mapEntry.getValue();
|
Settings entrySettings = mapEntry.getValue();
|
||||||
if (entrySettings.getAsBoolean("enabled", true)) {
|
if (entrySettings.getAsBoolean("enabled", true)) {
|
||||||
String entry = entrySettings.get("entry");
|
String entry = entrySettings.get("entry");
|
||||||
if (entry != null) {
|
String type = entrySettings.get("type", "clock");
|
||||||
|
if (entry != null && type != null) {
|
||||||
try {
|
try {
|
||||||
ClockEventService clockEventService = new ClockEventService(this, name);
|
ClockEventService clockEventService = new ClockEventService(this, name, type);
|
||||||
cronSchedule.add(name, CronExpression.parse(entry), clockEventService);
|
cronSchedule.add(name, CronExpression.parse(entry), clockEventService);
|
||||||
logger.log(Level.INFO, "cron job " + name + " scheduled on " + entry);
|
logger.log(Level.INFO, "cron job " + name + "with type " + type + " scheduled on " + entry);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
logger.log(Level.WARNING, "unable to schedule cron job " + mapEntry.getKey() + ", reason " + e.getMessage());
|
logger.log(Level.WARNING, "unable to schedule cron job " + mapEntry.getKey() + ", reason " + e.getMessage());
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
logger.log(Level.WARNING, "clock event service in configuration is incompletely defined, name = " + name );
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
logger.log(Level.WARNING, "clock event service " + name + " in configuration not enabled");
|
logger.log(Level.WARNING, "clock event service in configuration not enabled, name = " + name );
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
logger.log(Level.INFO, "entries = " + cronSchedule.getEntries());
|
logger.log(Level.INFO, "entries = " + cronSchedule.getEntries());
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
package org.xbib.event.clock;
|
package org.xbib.event.clock;
|
||||||
|
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.logging.Level;
|
import java.util.logging.Level;
|
||||||
import java.util.logging.Logger;
|
import java.util.logging.Logger;
|
||||||
|
@ -15,21 +16,26 @@ public class ClockEventService implements Callable<Boolean> {
|
||||||
|
|
||||||
private final String name;
|
private final String name;
|
||||||
|
|
||||||
|
private final String eventType;
|
||||||
|
|
||||||
public ClockEventService(ClockEventManagerService clockEventManagerService,
|
public ClockEventService(ClockEventManagerService clockEventManagerService,
|
||||||
String name) {
|
String name,
|
||||||
|
String eventType) {
|
||||||
this.clockEventManagerService = clockEventManagerService;
|
this.clockEventManagerService = clockEventManagerService;
|
||||||
this.name = name;
|
this.name = name;
|
||||||
|
this.eventType = Objects.requireNonNull(eventType, "clock event type must not be null");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Boolean call() {
|
public Boolean call() {
|
||||||
try {
|
try {
|
||||||
if (clockEventManagerService.getSuspended().contains(name)) {
|
if (clockEventManagerService.getSuspended().contains(name)) {
|
||||||
logger.log(Level.FINE, "clock event " + name + " suspended");
|
logger.log(Level.FINE, "clock event service " + name + " is suspended, unable to continue");
|
||||||
return false;
|
return false;
|
||||||
} else {
|
} else {
|
||||||
Event clockEvent = EventManager.eventBuilder()
|
Event clockEvent = EventManager.eventBuilder()
|
||||||
.setType("clock")
|
.setType(eventType)
|
||||||
|
|
||||||
.build();
|
.build();
|
||||||
clockEventManagerService.publish(clockEvent);
|
clockEventManagerService.publish(clockEvent);
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -1,23 +0,0 @@
|
||||||
package org.xbib.event.clock;
|
|
||||||
|
|
||||||
import org.xbib.event.ClockEvent;
|
|
||||||
import org.xbib.event.EventConsumer;
|
|
||||||
|
|
||||||
import java.util.logging.Logger;
|
|
||||||
import org.xbib.event.bus.AllowConcurrentEvents;
|
|
||||||
import org.xbib.event.bus.Subscribe;
|
|
||||||
|
|
||||||
public class SimpleClockEventConsumer implements EventConsumer {
|
|
||||||
|
|
||||||
private static final Logger logger = Logger.getLogger(SimpleClockEventConsumer.class.getName());
|
|
||||||
|
|
||||||
public SimpleClockEventConsumer() {
|
|
||||||
}
|
|
||||||
|
|
||||||
@Subscribe
|
|
||||||
@AllowConcurrentEvents
|
|
||||||
void onEvent(ClockEvent event) {
|
|
||||||
logger.info("received demo clock event, created = " + event.getCreated());
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -96,10 +96,10 @@ public final class EventManager extends AbstractEventManagerService implements E
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Event eventOf(String eventType,
|
public static Event eventOf(String eventType,
|
||||||
Instant scheduled) {
|
Instant instant) {
|
||||||
return eventBuilder()
|
return eventBuilder()
|
||||||
.setType(eventType)
|
.setType(eventType)
|
||||||
.setScheduledFor(scheduled)
|
.setInstant(instant)
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -120,13 +120,9 @@ public final class EventManager extends AbstractEventManagerService implements E
|
||||||
if (map.containsKey("message")) {
|
if (map.containsKey("message")) {
|
||||||
builder.setMessage(map.getOrDefault("message", "").toString());
|
builder.setMessage(map.getOrDefault("message", "").toString());
|
||||||
}
|
}
|
||||||
if (map.containsKey("created")) {
|
if (map.containsKey("instant")) {
|
||||||
String created = map.getOrDefault("created", "").toString();
|
String instant = map.getOrDefault("instant", "").toString();
|
||||||
builder.setCreated(Instant.parse(created));
|
builder.setInstant(Instant.parse(instant));
|
||||||
}
|
|
||||||
if (map.containsKey("scheduled")) {
|
|
||||||
String scheduled = map.getOrDefault("scheduled", "").toString();
|
|
||||||
builder.setCreated(Instant.parse(scheduled));
|
|
||||||
}
|
}
|
||||||
if (map.containsKey("payload")) {
|
if (map.containsKey("payload")) {
|
||||||
PayloadImpl payload = new PayloadImpl((Map<String, Object>) map.get("payload"));
|
PayloadImpl payload = new PayloadImpl((Map<String, Object>) map.get("payload"));
|
||||||
|
@ -383,13 +379,8 @@ public final class EventManager extends AbstractEventManagerService implements E
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Instant getCreated() {
|
public Instant getInstant() {
|
||||||
return builder.created;
|
return builder.instant;
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Instant getScheduledFor() {
|
|
||||||
return builder.scheduled;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -422,9 +413,12 @@ public final class EventManager extends AbstractEventManagerService implements E
|
||||||
if (getPayload() != null && !getPayload().isEmpty()) {
|
if (getPayload() != null && !getPayload().isEmpty()) {
|
||||||
builder.buildKey("payload").buildMap(getPayload());
|
builder.buildKey("payload").buildMap(getPayload());
|
||||||
}
|
}
|
||||||
builder.fieldIfNotNull("created", getCreated() != null ? getCreated().toString() : null);
|
if (getInstant() != null) {
|
||||||
builder.fieldIfNotNull("scheduled", getScheduledFor() != null ? getScheduledFor().toString() : null);
|
builder.fieldIfNotNull("instant", getInstant().toString());
|
||||||
builder.fieldIfNotNull("path", getPath() != null ? getPath().toAbsolutePath().toString() : null);
|
}
|
||||||
|
if (getPath() != null) {
|
||||||
|
builder.fieldIfNotNull("path", getPath().toAbsolutePath().toString());
|
||||||
|
}
|
||||||
builder.endMap();
|
builder.endMap();
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
@ -482,9 +476,7 @@ public final class EventManager extends AbstractEventManagerService implements E
|
||||||
|
|
||||||
String message;
|
String message;
|
||||||
|
|
||||||
Instant created;
|
Instant instant;
|
||||||
|
|
||||||
Instant scheduled;
|
|
||||||
|
|
||||||
PayloadImpl payload;
|
PayloadImpl payload;
|
||||||
|
|
||||||
|
@ -526,13 +518,8 @@ public final class EventManager extends AbstractEventManagerService implements E
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
public EventBuilder setCreated(Instant created) {
|
public EventBuilder setInstant(Instant instant) {
|
||||||
this.created = created;
|
this.instant = instant;
|
||||||
return this;
|
|
||||||
}
|
|
||||||
|
|
||||||
public EventBuilder setScheduledFor(Instant scheduled) {
|
|
||||||
this.scheduled = scheduled;
|
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -574,7 +561,7 @@ public final class EventManager extends AbstractEventManagerService implements E
|
||||||
cl = NullEvent.class;
|
cl = NullEvent.class;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
this.created = Instant.now();
|
this.instant = Instant.now();
|
||||||
if (listener != null) {
|
if (listener != null) {
|
||||||
return cl.getDeclaredConstructor(EventBuilder.class, Listener.class).newInstance(this, listener);
|
return cl.getDeclaredConstructor(EventBuilder.class, Listener.class).newInstance(this, listener);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -13,10 +13,12 @@ public class ClockEventManagerTest {
|
||||||
TestClockEventConsumer clockEventConsumer = new TestClockEventConsumer();
|
TestClockEventConsumer clockEventConsumer = new TestClockEventConsumer();
|
||||||
Settings settings = Settings.settingsBuilder()
|
Settings settings = Settings.settingsBuilder()
|
||||||
.put("event.clock.testclockevent.enabled", "true")
|
.put("event.clock.testclockevent.enabled", "true")
|
||||||
|
.put("event.clock.testclockevent.type", "test-clock-event")
|
||||||
.put("event.clock.testclockevent.entry", "*/1 6-21 * * *")
|
.put("event.clock.testclockevent.entry", "*/1 6-21 * * *")
|
||||||
.build();
|
.build();
|
||||||
EventManager eventManager = EventManager.builder()
|
EventManager eventManager = EventManager.builder()
|
||||||
.setSettings(settings)
|
.setSettings(settings)
|
||||||
|
.register("test-clock-event", TestClockEvent.class)
|
||||||
.register(clockEventConsumer)
|
.register(clockEventConsumer)
|
||||||
.build();
|
.build();
|
||||||
Thread.sleep(90000L);
|
Thread.sleep(90000L);
|
||||||
|
|
|
@ -0,0 +1,11 @@
|
||||||
|
package org.xbib.event.clock;
|
||||||
|
|
||||||
|
import org.xbib.event.common.ClockEventImpl;
|
||||||
|
import org.xbib.event.common.EventManager;
|
||||||
|
|
||||||
|
public class TestClockEvent extends ClockEventImpl {
|
||||||
|
|
||||||
|
public TestClockEvent(EventManager.EventBuilder builder) {
|
||||||
|
super(builder);
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,6 +1,5 @@
|
||||||
package org.xbib.event.clock;
|
package org.xbib.event.clock;
|
||||||
|
|
||||||
import org.xbib.event.ClockEvent;
|
|
||||||
import org.xbib.event.EventConsumer;
|
import org.xbib.event.EventConsumer;
|
||||||
import org.xbib.event.bus.AllowConcurrentEvents;
|
import org.xbib.event.bus.AllowConcurrentEvents;
|
||||||
import org.xbib.event.bus.Subscribe;
|
import org.xbib.event.bus.Subscribe;
|
||||||
|
@ -15,7 +14,8 @@ public class TestClockEventConsumer implements EventConsumer {
|
||||||
|
|
||||||
@Subscribe
|
@Subscribe
|
||||||
@AllowConcurrentEvents
|
@AllowConcurrentEvents
|
||||||
void onEvent(ClockEvent event) {
|
void onEvent(TestClockEvent event) {
|
||||||
logger.log(Level.INFO, "received clock event on " + Instant.now() + " event instant = " + event.getCreated());
|
logger.log(Level.INFO, "received test clock event on " + Instant.now() +
|
||||||
|
" event instant = " + event.getInstant());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,7 +16,7 @@ public class TestTimerEventConsumer implements EventConsumer {
|
||||||
@Subscribe
|
@Subscribe
|
||||||
@AllowConcurrentEvents
|
@AllowConcurrentEvents
|
||||||
void onEvent(TimerEvent event) {
|
void onEvent(TimerEvent event) {
|
||||||
logger.log(Level.INFO, "received timer event on " + Instant.now() + " event instant = " + event.getCreated());
|
logger.log(Level.INFO, "received timer event on " + Instant.now() + " event instant = " + event.getInstant());
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue