From 5c31d6e0559cbc6cb8c827b4c71f5de0e0918d2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rg=20Prante?= Date: Tue, 16 Jan 2024 09:34:39 +0100 Subject: [PATCH] add event-pubsub --- event-pubsub/build.gradle | 3 + event-pubsub/src/main/java/module-info.java | 6 + .../org/xbib/event/pubsub/MessageBroker.java | 106 ++++++++++++++++++ .../java/org/xbib/event/pubsub/Publisher.java | 4 + .../org/xbib/event/pubsub/Subscriber.java | 4 + .../java/org/xbib/event/pubsub/Topic.java | 46 ++++++++ settings.gradle | 1 + 7 files changed, 170 insertions(+) create mode 100644 event-pubsub/build.gradle create mode 100644 event-pubsub/src/main/java/module-info.java create mode 100644 event-pubsub/src/main/java/org/xbib/event/pubsub/MessageBroker.java create mode 100644 event-pubsub/src/main/java/org/xbib/event/pubsub/Publisher.java create mode 100644 event-pubsub/src/main/java/org/xbib/event/pubsub/Subscriber.java create mode 100644 event-pubsub/src/main/java/org/xbib/event/pubsub/Topic.java diff --git a/event-pubsub/build.gradle b/event-pubsub/build.gradle new file mode 100644 index 0000000..44a5eeb --- /dev/null +++ b/event-pubsub/build.gradle @@ -0,0 +1,3 @@ +dependencies { + api project(':event-common') +} diff --git a/event-pubsub/src/main/java/module-info.java b/event-pubsub/src/main/java/module-info.java new file mode 100644 index 0000000..4050289 --- /dev/null +++ b/event-pubsub/src/main/java/module-info.java @@ -0,0 +1,6 @@ +module org.xbib.event.pubsub { + uses org.xbib.event.pubsub.Subscriber; + exports org.xbib.event.pubsub; + requires org.xbib.event.common; + requires java.logging; +} diff --git a/event-pubsub/src/main/java/org/xbib/event/pubsub/MessageBroker.java b/event-pubsub/src/main/java/org/xbib/event/pubsub/MessageBroker.java new file mode 100644 index 0000000..f675789 --- /dev/null +++ b/event-pubsub/src/main/java/org/xbib/event/pubsub/MessageBroker.java @@ -0,0 +1,106 @@ +package org.xbib.event.pubsub; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.xbib.event.bus.AsyncEventBus; +import org.xbib.event.bus.SubscriberExceptionContext; +import org.xbib.event.bus.SubscriberExceptionHandler; + +public class MessageBroker { + + private static final Logger logger = Logger.getLogger(MessageBroker.class.getName()); + + private final Builder builder; + + private final Collection> topics; + + private MessageBroker(Builder builder) { + this.builder = builder; + this.topics = new ArrayList<>(); + } + + public void subscribe(Subscriber subscriber, Topic topic) { + topic.add(subscriber); + if (!topics.contains(topic)) { + topics.add(topic); + } + builder.eventBus.register(subscriber); + } + + public void unsubscribe(Subscriber subscriber, Topic topic) { + topic.remove(subscriber); + builder.eventBus.unregister(subscriber); + } + + public void publish(E event) { + builder.eventBus.post(event); + } + + public static class Builder { + + private int threadCount; + + private ExecutorService executorService; + + private ClassLoader classLoader; + + private SubscriberExceptionHandler subscriberExceptionHandler; + + private AsyncEventBus eventBus; + + private Builder() { + this.subscriberExceptionHandler = new EventManagerExceptionHandler(); + } + + public Builder setThreadCount(int threadCount) { + this.threadCount = threadCount; + return this; + } + + public Builder setExecutorService(ExecutorService executorService) { + this.executorService = executorService; + return this; + } + + public Builder setClassLoader(ClassLoader classLoader) { + this.classLoader = classLoader; + return this; + } + + public Builder setEventBus(AsyncEventBus eventBus) { + this.eventBus = eventBus; + return this; + } + + public Builder setSubscriberExceptionHandler(SubscriberExceptionHandler subscriberExceptionHandler) { + this.subscriberExceptionHandler = subscriberExceptionHandler; + return this; + } + + public MessageBroker build() { + if (executorService == null) { + executorService = threadCount > 0 ? + Executors.newFixedThreadPool(threadCount) : + Executors.newCachedThreadPool(); + } + if (classLoader == null) { + classLoader = MessageBroker.class.getClassLoader(); + } + if (eventBus == null) { + eventBus = new AsyncEventBus(executorService, subscriberExceptionHandler); + } + return new MessageBroker(this); + } + } + + private static class EventManagerExceptionHandler implements SubscriberExceptionHandler { + @Override + public void handleException(Throwable exception, SubscriberExceptionContext context) { + logger.log(Level.SEVERE, exception.getMessage(), exception); + } + } +} diff --git a/event-pubsub/src/main/java/org/xbib/event/pubsub/Publisher.java b/event-pubsub/src/main/java/org/xbib/event/pubsub/Publisher.java new file mode 100644 index 0000000..38cf764 --- /dev/null +++ b/event-pubsub/src/main/java/org/xbib/event/pubsub/Publisher.java @@ -0,0 +1,4 @@ +package org.xbib.event.pubsub; + +public class Publisher { +} diff --git a/event-pubsub/src/main/java/org/xbib/event/pubsub/Subscriber.java b/event-pubsub/src/main/java/org/xbib/event/pubsub/Subscriber.java new file mode 100644 index 0000000..7350ff8 --- /dev/null +++ b/event-pubsub/src/main/java/org/xbib/event/pubsub/Subscriber.java @@ -0,0 +1,4 @@ +package org.xbib.event.pubsub; + +public class Subscriber { +} diff --git a/event-pubsub/src/main/java/org/xbib/event/pubsub/Topic.java b/event-pubsub/src/main/java/org/xbib/event/pubsub/Topic.java new file mode 100644 index 0000000..56b8091 --- /dev/null +++ b/event-pubsub/src/main/java/org/xbib/event/pubsub/Topic.java @@ -0,0 +1,46 @@ +package org.xbib.event.pubsub; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Queue; +import java.util.concurrent.LinkedBlockingQueue; + +public class Topic { + + private final String name; + + private final Queue queue; + + private final Collection> subscribers; + + public Topic(String name) { + this.name = name; + this.queue = new LinkedBlockingQueue<>(); + this.subscribers = new ArrayList<>(); + } + + public String getName() { + return name; + } + + public Queue getQueue() { + return queue; + } + + public Collection> getSubscribers() { + return subscribers; + } + + public void add(Subscriber subscriber) { + subscribers.add(subscriber); + } + + public void remove(Subscriber subscriber) { + subscribers.remove(subscriber); + } + + public void add(E event) { + queue.add(event); + } + +} diff --git a/settings.gradle b/settings.gradle index 547fade..f23b7e7 100644 --- a/settings.gradle +++ b/settings.gradle @@ -48,4 +48,5 @@ include 'event-async' include 'event-common' include 'event-loop' include 'event-net-http' +include 'event-pubsub' include 'event-syslog'