add event-pubsub

This commit is contained in:
Jörg Prante 2024-01-16 09:34:39 +01:00
parent 7c5a1f1a2b
commit 5c31d6e055
7 changed files with 170 additions and 0 deletions

View file

@ -0,0 +1,3 @@
dependencies {
api project(':event-common')
}

View file

@ -0,0 +1,6 @@
module org.xbib.event.pubsub {
uses org.xbib.event.pubsub.Subscriber;
exports org.xbib.event.pubsub;
requires org.xbib.event.common;
requires java.logging;
}

View file

@ -0,0 +1,106 @@
package org.xbib.event.pubsub;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.xbib.event.bus.AsyncEventBus;
import org.xbib.event.bus.SubscriberExceptionContext;
import org.xbib.event.bus.SubscriberExceptionHandler;
public class MessageBroker<E> {
private static final Logger logger = Logger.getLogger(MessageBroker.class.getName());
private final Builder builder;
private final Collection<Topic<E>> topics;
private MessageBroker(Builder builder) {
this.builder = builder;
this.topics = new ArrayList<>();
}
public void subscribe(Subscriber<E> subscriber, Topic<E> topic) {
topic.add(subscriber);
if (!topics.contains(topic)) {
topics.add(topic);
}
builder.eventBus.register(subscriber);
}
public void unsubscribe(Subscriber<E> subscriber, Topic<E> topic) {
topic.remove(subscriber);
builder.eventBus.unregister(subscriber);
}
public void publish(E event) {
builder.eventBus.post(event);
}
public static class Builder {
private int threadCount;
private ExecutorService executorService;
private ClassLoader classLoader;
private SubscriberExceptionHandler subscriberExceptionHandler;
private AsyncEventBus eventBus;
private Builder() {
this.subscriberExceptionHandler = new EventManagerExceptionHandler();
}
public Builder setThreadCount(int threadCount) {
this.threadCount = threadCount;
return this;
}
public Builder setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
return this;
}
public Builder setClassLoader(ClassLoader classLoader) {
this.classLoader = classLoader;
return this;
}
public Builder setEventBus(AsyncEventBus eventBus) {
this.eventBus = eventBus;
return this;
}
public Builder setSubscriberExceptionHandler(SubscriberExceptionHandler subscriberExceptionHandler) {
this.subscriberExceptionHandler = subscriberExceptionHandler;
return this;
}
public MessageBroker build() {
if (executorService == null) {
executorService = threadCount > 0 ?
Executors.newFixedThreadPool(threadCount) :
Executors.newCachedThreadPool();
}
if (classLoader == null) {
classLoader = MessageBroker.class.getClassLoader();
}
if (eventBus == null) {
eventBus = new AsyncEventBus(executorService, subscriberExceptionHandler);
}
return new MessageBroker(this);
}
}
private static class EventManagerExceptionHandler implements SubscriberExceptionHandler {
@Override
public void handleException(Throwable exception, SubscriberExceptionContext context) {
logger.log(Level.SEVERE, exception.getMessage(), exception);
}
}
}

View file

@ -0,0 +1,4 @@
package org.xbib.event.pubsub;
public class Publisher {
}

View file

@ -0,0 +1,4 @@
package org.xbib.event.pubsub;
public class Subscriber<E> {
}

View file

@ -0,0 +1,46 @@
package org.xbib.event.pubsub;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
public class Topic<E> {
private final String name;
private final Queue<E> queue;
private final Collection<Subscriber<E>> subscribers;
public Topic(String name) {
this.name = name;
this.queue = new LinkedBlockingQueue<>();
this.subscribers = new ArrayList<>();
}
public String getName() {
return name;
}
public Queue<E> getQueue() {
return queue;
}
public Collection<Subscriber<E>> getSubscribers() {
return subscribers;
}
public void add(Subscriber<E> subscriber) {
subscribers.add(subscriber);
}
public void remove(Subscriber<E> subscriber) {
subscribers.remove(subscriber);
}
public void add(E event) {
queue.add(event);
}
}

View file

@ -48,4 +48,5 @@ include 'event-async'
include 'event-common'
include 'event-loop'
include 'event-net-http'
include 'event-pubsub'
include 'event-syslog'