diff --git a/NOTICE.txt b/NOTICE.txt index 0088fc5..588d9aa 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -11,3 +11,11 @@ by Fernando Miguel Gamboa de Carvalho (fmcarvalho) branched as of 23 December, 2021 License: Apache 2.0 + +The work in org.xbib.event.bus is taken from Guava + +https://github.com/google/guava + +as of 27 August, 2022 + +License: Apache 2.0 diff --git a/build.gradle b/build.gradle index 6afd877..5c9da9f 100644 --- a/build.gradle +++ b/build.gradle @@ -7,6 +7,7 @@ wrapper { gradleVersion = libs.versions.gradle.get() distributionType = Wrapper.DistributionType.ALL } + ext { user = 'jprante' name = 'event' @@ -32,8 +33,9 @@ apply from: rootProject.file('gradle/publishing/sonatype.gradle') dependencies { api libs.settings.api - implementation libs.guava + implementation libs.net implementation libs.time + implementation libs.datastructures.common implementation libs.datastructures.json.tiny implementation libs.reactivestreams testImplementation libs.rxjava3 diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 92f06b5..2ec77e5 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-7.4.2-all.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-7.5-all.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/gradlew b/gradlew index 1b6c787..a69d9cb 100755 --- a/gradlew +++ b/gradlew @@ -205,6 +205,12 @@ set -- \ org.gradle.wrapper.GradleWrapperMain \ "$@" +# Stop when "xargs" is not available. +if ! command -v xargs >/dev/null 2>&1 +then + die "xargs is not available" +fi + # Use "xargs" to parse quoted args. # # With -n1 it outputs one arg per line, with the quotes and backslashes removed. diff --git a/gradlew.bat b/gradlew.bat index 107acd3..53a6b23 100644 --- a/gradlew.bat +++ b/gradlew.bat @@ -1,89 +1,91 @@ -@rem -@rem Copyright 2015 the original author or authors. -@rem -@rem Licensed under the Apache License, Version 2.0 (the "License"); -@rem you may not use this file except in compliance with the License. -@rem You may obtain a copy of the License at -@rem -@rem https://www.apache.org/licenses/LICENSE-2.0 -@rem -@rem Unless required by applicable law or agreed to in writing, software -@rem distributed under the License is distributed on an "AS IS" BASIS, -@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -@rem See the License for the specific language governing permissions and -@rem limitations under the License. -@rem - -@if "%DEBUG%" == "" @echo off -@rem ########################################################################## -@rem -@rem Gradle startup script for Windows -@rem -@rem ########################################################################## - -@rem Set local scope for the variables with windows NT shell -if "%OS%"=="Windows_NT" setlocal - -set DIRNAME=%~dp0 -if "%DIRNAME%" == "" set DIRNAME=. -set APP_BASE_NAME=%~n0 -set APP_HOME=%DIRNAME% - -@rem Resolve any "." and ".." in APP_HOME to make it shorter. -for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi - -@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" - -@rem Find java.exe -if defined JAVA_HOME goto findJavaFromJavaHome - -set JAVA_EXE=java.exe -%JAVA_EXE% -version >NUL 2>&1 -if "%ERRORLEVEL%" == "0" goto execute - -echo. -echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. - -goto fail - -:findJavaFromJavaHome -set JAVA_HOME=%JAVA_HOME:"=% -set JAVA_EXE=%JAVA_HOME%/bin/java.exe - -if exist "%JAVA_EXE%" goto execute - -echo. -echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% -echo. -echo Please set the JAVA_HOME variable in your environment to match the -echo location of your Java installation. - -goto fail - -:execute -@rem Setup the command line - -set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar - - -@rem Execute Gradle -"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* - -:end -@rem End local scope for the variables with windows NT shell -if "%ERRORLEVEL%"=="0" goto mainEnd - -:fail -rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of -rem the _cmd.exe /c_ return code! -if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 -exit /b 1 - -:mainEnd -if "%OS%"=="Windows_NT" endlocal - -:omega +@rem +@rem Copyright 2015 the original author or authors. +@rem +@rem Licensed under the Apache License, Version 2.0 (the "License"); +@rem you may not use this file except in compliance with the License. +@rem You may obtain a copy of the License at +@rem +@rem https://www.apache.org/licenses/LICENSE-2.0 +@rem +@rem Unless required by applicable law or agreed to in writing, software +@rem distributed under the License is distributed on an "AS IS" BASIS, +@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +@rem See the License for the specific language governing permissions and +@rem limitations under the License. +@rem + +@if "%DEBUG%"=="" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%"=="" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Resolve any "." and ".." in APP_HOME to make it shorter. +for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if %ERRORLEVEL% equ 0 goto execute + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto execute + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* + +:end +@rem End local scope for the variables with windows NT shell +if %ERRORLEVEL% equ 0 goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +set EXIT_CODE=%ERRORLEVEL% +if %EXIT_CODE% equ 0 set EXIT_CODE=1 +if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE% +exit /b %EXIT_CODE% + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/settings.gradle b/settings.gradle index f5d4669..916d41b 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,17 +1,22 @@ dependencyResolutionManagement { versionCatalogs { libs { - version('gradle', '7.4.2') + version('gradle', '7.5') + version('groovy', '3.0.10') version('junit', '5.8.2') + version('datastructures', '1.0.0') + version('net', '3.0.0') + version('content', '5.0.0') library('junit-jupiter-api', 'org.junit.jupiter', 'junit-jupiter-api').versionRef('junit') library('junit-jupiter-params', 'org.junit.jupiter', 'junit-jupiter-params').versionRef('junit') library('junit-jupiter-engine', 'org.junit.jupiter', 'junit-jupiter-engine').versionRef('junit') - library('junit4', 'junit', 'junit').version('4.13.2') library('hamcrest', 'org.hamcrest', 'hamcrest-library').version('2.2') - library('settings-api', 'org.xbib', 'settings-api').version('4.0.0') - library('guava', 'org.xbib', 'guava').version('30.1') - library('time', 'org.xbib', 'time').version('2.1.1') - library('datastructures-json-tiny', 'org.xbib', 'datastructures-json-tiny').version('1.0.0') + library('junit4', 'junit', 'junit').version('4.13.2') + library('settings-api', 'org.xbib', 'settings-api').versionRef('content') + library('net', 'org.xbib', 'net').versionRef('net') + library('datastructures-common', 'org.xbib', 'datastructures-common').versionRef('datastructures') + library('datastructures-json-tiny', 'org.xbib', 'datastructures-json-tiny').versionRef('datastructures') + library('time', 'org.xbib', 'time').version('2.3.0') library('reactivestreams', 'org.reactivestreams', 'reactive-streams').version('1.0.3') library('rxjava3', 'io.reactivex.rxjava3', 'rxjava').version('3.0.3') } diff --git a/src/main/java/module-info.java b/src/main/java/module-info.java index 31112bf..8fbf381 100644 --- a/src/main/java/module-info.java +++ b/src/main/java/module-info.java @@ -1,20 +1,18 @@ module org.xbib.event { exports org.xbib.event; exports org.xbib.event.async; + exports org.xbib.event.bus; exports org.xbib.event.clock; exports org.xbib.event.persistence; exports org.xbib.event.queue; - exports org.xbib.event.queue.path.simple; - exports org.xbib.event.queue.path.watch; - exports org.xbib.event.timer; + exports org.xbib.event.syslog; exports org.xbib.event.yield; - exports org.xbib.event.yield.async; - exports org.xbib.event.yield.boxes; - exports org.xbib.event.yield.ops; - requires org.xbib.guava; - requires org.xbib.settings.api; - requires org.xbib.time; + requires org.xbib.datastructures.api; + requires org.xbib.datastructures.common; requires org.xbib.datastructures.json.tiny; - requires org.reactivestreams; + requires transitive org.xbib.settings.api; + requires org.xbib.time; + requires org.xbib.net; + requires transitive org.reactivestreams; requires java.logging; } diff --git a/src/main/java/org/xbib/event/EventService.java b/src/main/java/org/xbib/event/EventService.java index c15ed96..35eefc7 100644 --- a/src/main/java/org/xbib/event/EventService.java +++ b/src/main/java/org/xbib/event/EventService.java @@ -1,4 +1,7 @@ package org.xbib.event; public class EventService { + + public EventService() { + } } diff --git a/src/main/java/org/xbib/event/FileFollowEvent.java b/src/main/java/org/xbib/event/FileFollowEvent.java index 7ad1552..017ea86 100644 --- a/src/main/java/org/xbib/event/FileFollowEvent.java +++ b/src/main/java/org/xbib/event/FileFollowEvent.java @@ -1,4 +1,7 @@ package org.xbib.event; public class FileFollowEvent { + + public FileFollowEvent() { + } } diff --git a/src/main/java/org/xbib/event/async/AbstractAsyncFileReaderLines.java b/src/main/java/org/xbib/event/async/AbstractAsyncFileReaderLines.java index 9360afd..188b559 100644 --- a/src/main/java/org/xbib/event/async/AbstractAsyncFileReaderLines.java +++ b/src/main/java/org/xbib/event/async/AbstractAsyncFileReaderLines.java @@ -27,6 +27,9 @@ public abstract class AbstractAsyncFileReaderLines { private boolean cancelled = false; + public AbstractAsyncFileReaderLines() { + } + protected abstract void onError(Throwable error); protected abstract void onComplete(); diff --git a/src/main/java/org/xbib/event/bus/AllowConcurrentEvents.java b/src/main/java/org/xbib/event/bus/AllowConcurrentEvents.java new file mode 100644 index 0000000..1f8bdbd --- /dev/null +++ b/src/main/java/org/xbib/event/bus/AllowConcurrentEvents.java @@ -0,0 +1,17 @@ +package org.xbib.event.bus; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Marks an event subscriber method as being thread-safe. This annotation indicates that EventBus + * may invoke the event subscriber simultaneously from multiple threads. + * + *

This does not mark the method, and so should be used in combination with {@link Subscribe}. + * + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +public @interface AllowConcurrentEvents {} diff --git a/src/main/java/org/xbib/event/bus/AsyncEventBus.java b/src/main/java/org/xbib/event/bus/AsyncEventBus.java new file mode 100644 index 0000000..a870d05 --- /dev/null +++ b/src/main/java/org/xbib/event/bus/AsyncEventBus.java @@ -0,0 +1,44 @@ +package org.xbib.event.bus; + +import java.util.concurrent.Executor; + +/** + * An {@link EventBus} that takes the Executor of your choice and uses it to dispatch events, + * allowing dispatch to occur asynchronously. + */ +public class AsyncEventBus extends EventBus { + + /** + * Creates a new AsyncEventBus that will use {@code executor} to dispatch events. Assigns {@code + * identifier} as the bus's name for logging purposes. + * + * @param identifier short name for the bus, for logging purposes. + * @param executor Executor to use to dispatch events. It is the caller's responsibility to shut + * down the executor after the last event has been posted to this event bus. + */ + public AsyncEventBus(String identifier, Executor executor) { + super(identifier, executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE); + } + + /** + * Creates a new AsyncEventBus that will use {@code executor} to dispatch events. + * + * @param executor Executor to use to dispatch events. It is the caller's responsibility to shut + * down the executor after the last event has been posted to this event bus. + * @param subscriberExceptionHandler Handler used to handle exceptions thrown from subscribers. + * See {@link SubscriberExceptionHandler} for more information. + */ + public AsyncEventBus(Executor executor, SubscriberExceptionHandler subscriberExceptionHandler) { + super("default", executor, Dispatcher.legacyAsync(), subscriberExceptionHandler); + } + + /** + * Creates a new AsyncEventBus that will use {@code executor} to dispatch events. + * + * @param executor Executor to use to dispatch events. It is the caller's responsibility to shut + * down the executor after the last event has been posted to this event bus. + */ + public AsyncEventBus(Executor executor) { + super("default", executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE); + } +} diff --git a/src/main/java/org/xbib/event/bus/DeadEvent.java b/src/main/java/org/xbib/event/bus/DeadEvent.java new file mode 100644 index 0000000..52de3d9 --- /dev/null +++ b/src/main/java/org/xbib/event/bus/DeadEvent.java @@ -0,0 +1,52 @@ +package org.xbib.event.bus; + +import java.util.Objects; + +/** + * Wraps an event that was posted, but which had no subscribers and thus could not be delivered. + * + *

Registering a DeadEvent subscriber is useful for debugging or logging, as it can detect + * misconfigurations in a system's event distribution. + */ +public class DeadEvent { + + private final Object source; + + private final Object event; + + /** + * Creates a new DeadEvent. + * + * @param source object broadcasting the DeadEvent (generally the {@link EventBus}). + * @param event the event that could not be delivered. + */ + public DeadEvent(Object source, Object event) { + this.source = Objects.requireNonNull(source); + this.event = Objects.requireNonNull(event); + } + + /** + * Returns the object that originated this event (not the object that originated the + * wrapped event). This is generally an {@link EventBus}. + * + * @return the source of this event. + */ + public Object getSource() { + return source; + } + + /** + * Returns the wrapped, 'dead' event, which the system was unable to deliver to any registered + * subscriber. + * + * @return the 'dead' event that could not be delivered. + */ + public Object getEvent() { + return event; + } + + @Override + public String toString() { + return "DeadEvent{source=" + source + ",event=" + event + "}"; + } +} diff --git a/src/main/java/org/xbib/event/bus/Dispatcher.java b/src/main/java/org/xbib/event/bus/Dispatcher.java new file mode 100644 index 0000000..3923351 --- /dev/null +++ b/src/main/java/org/xbib/event/bus/Dispatcher.java @@ -0,0 +1,177 @@ +package org.xbib.event.bus; + +import java.util.ArrayDeque; +import java.util.Iterator; +import java.util.Objects; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; + +/** + * Handler for dispatching events to subscribers, providing different event ordering guarantees that + * make sense for different situations. + * + *

Note: The dispatcher is orthogonal to the subscriber's {@code Executor}. The dispatcher + * controls the order in which events are dispatched, while the executor controls how (i.e. on which + * thread) the subscriber is actually called when an event is dispatched to it. + * + */ +public abstract class Dispatcher { + + private Dispatcher() { + } + + /** + * Returns a dispatcher that queues events that are posted reentrantly on a thread that is already + * dispatching an event, guaranteeing that all events posted on a single thread are dispatched to + * all subscribers in the order they are posted. + * + *

When all subscribers are dispatched to using a direct executor (which dispatches on + * the same thread that posts the event), this yields a breadth-first dispatch order on each + * thread. That is, all subscribers to a single event A will be called before any subscribers to + * any events B and C that are posted to the event bus by the subscribers to A. + */ + public static Dispatcher perThreadDispatchQueue() { + return new PerThreadQueuedDispatcher(); + } + + /** + * Returns a dispatcher that queues events that are posted in a single global queue. This behavior + * matches the original behavior of AsyncEventBus exactly, but is otherwise not especially useful. + * For async dispatch, an {@linkplain #immediate() immediate} dispatcher should generally be + * preferable. + */ + public static Dispatcher legacyAsync() { + return new LegacyAsyncDispatcher(); + } + + /** + * Returns a dispatcher that dispatches events to subscribers immediately as they're posted + * without using an intermediate queue to change the dispatch order. This is effectively a + * depth-first dispatch order, vs. breadth-first when using a queue. + */ + public static Dispatcher immediate() { + return ImmediateDispatcher.INSTANCE; + } + + /** Dispatches the given {@code event} to the given {@code subscribers}. */ + public abstract void dispatch(Object event, Iterator subscribers); + + /** Implementation of a {@link #perThreadDispatchQueue()} dispatcher. */ + private static final class PerThreadQueuedDispatcher extends Dispatcher { + + // This dispatcher matches the original dispatch behavior of EventBus. + + /** Per-thread queue of events to dispatch. */ + private final ThreadLocal> queue = + new ThreadLocal>() { + @Override + protected Queue initialValue() { + return new ArrayDeque<>(); + } + }; + + /** Per-thread dispatch state, used to avoid reentrant event dispatching. */ + private final ThreadLocal dispatching = + new ThreadLocal() { + @Override + protected Boolean initialValue() { + return false; + } + }; + + @Override + public void dispatch(Object event, Iterator subscribers) { + Objects.requireNonNull(event); + Objects.requireNonNull(subscribers); + Queue queueForThread = queue.get(); + queueForThread.offer(new Event(event, subscribers)); + + if (!dispatching.get()) { + dispatching.set(true); + try { + Event nextEvent; + while ((nextEvent = queueForThread.poll()) != null) { + while (nextEvent.subscribers.hasNext()) { + nextEvent.subscribers.next().dispatchEvent(nextEvent.event); + } + } + } finally { + dispatching.remove(); + queue.remove(); + } + } + } + + private static final class Event { + private final Object event; + private final Iterator subscribers; + + private Event(Object event, Iterator subscribers) { + this.event = event; + this.subscribers = subscribers; + } + } + } + + /** Implementation of a {@link #legacyAsync()} dispatcher. */ + private static final class LegacyAsyncDispatcher extends Dispatcher { + + // This dispatcher matches the original dispatch behavior of AsyncEventBus. + // + // We can't really make any guarantees about the overall dispatch order for this dispatcher in + // a multithreaded environment for a couple reasons: + // + // 1. Subscribers to events posted on different threads can be interleaved with each other + // freely. (A event on one thread, B event on another could yield any of + // [a1, a2, a3, b1, b2], [a1, b2, a2, a3, b2], [a1, b2, b3, a2, a3], etc.) + // 2. It's possible for subscribers to actually be dispatched to in a different order than they + // were added to the queue. It's easily possible for one thread to take the head of the + // queue, immediately followed by another thread taking the next element in the queue. That + // second thread can then dispatch to the subscriber it took before the first thread does. + // + // All this makes me really wonder if there's any value in queueing here at all. A dispatcher + // that simply loops through the subscribers and dispatches the event to each would actually + // probably provide a stronger order guarantee, though that order would obviously be different + // in some cases. + + /** Global event queue. */ + private final ConcurrentLinkedQueue queue = + new ConcurrentLinkedQueue<>(); + + @Override + public void dispatch(Object event, Iterator subscribers) { + Objects.requireNonNull(event); + while (subscribers.hasNext()) { + queue.add(new EventWithSubscriber(event, subscribers.next())); + } + + EventWithSubscriber e; + while ((e = queue.poll()) != null) { + e.subscriber.dispatchEvent(e.event); + } + } + + private static final class EventWithSubscriber { + private final Object event; + private final Subscriber subscriber; + + private EventWithSubscriber(Object event, Subscriber subscriber) { + this.event = event; + this.subscriber = subscriber; + } + } + } + + /** Implementation of {@link #immediate()}. */ + private static final class ImmediateDispatcher extends Dispatcher { + private static final ImmediateDispatcher INSTANCE = new ImmediateDispatcher(); + + @Override + public void dispatch(Object event, Iterator subscribers) { + Objects.requireNonNull(event); + while (subscribers.hasNext()) { + subscribers.next().dispatchEvent(event); + } + } + } +} diff --git a/src/main/java/org/xbib/event/bus/EventBus.java b/src/main/java/org/xbib/event/bus/EventBus.java new file mode 100644 index 0000000..9adee94 --- /dev/null +++ b/src/main/java/org/xbib/event/bus/EventBus.java @@ -0,0 +1,229 @@ +package org.xbib.event.bus; + +import java.lang.reflect.Method; +import java.util.Iterator; +import java.util.Locale; +import java.util.Objects; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Dispatches events to listeners, and provides ways for listeners to register themselves. + * + *

The EventBus allows publish-subscribe-style communication between components without requiring + * the components to explicitly register with one another (and thus be aware of each other). It is + * designed exclusively to replace traditional Java in-process event distribution using explicit + * registration. It is not a general-purpose publish-subscribe system, nor is it intended + * for interprocess communication. + * + *

Receiving Events

+ * + *

To receive events, an object should: + * + *

    + *
  1. Expose a public method, known as the event subscriber, which accepts a single + * argument of the type of event desired; + *
  2. Mark it with a {@link Subscribe} annotation; + *
  3. Pass itself to an EventBus instance's {@link #register(Object)} method. + *
+ * + *

Posting Events

+ * + *

To post an event, simply provide the event object to the {@link #post(Object)} method. The + * EventBus instance will determine the type of event and route it to all registered listeners. + * + *

Events are routed based on their type — an event will be delivered to any subscriber for + * any type to which the event is assignable. This includes implemented interfaces, all + * superclasses, and all interfaces implemented by superclasses. + * + *

When {@code post} is called, all registered subscribers for an event are run in sequence, so + * subscribers should be reasonably quick. If an event may trigger an extended process (such as a + * database load), spawn a thread or queue it for later. (For a convenient way to do this, use an + * {@link AsyncEventBus}.) + * + *

Subscriber Methods

+ * + *

Event subscriber methods must accept only one argument: the event. + * + *

Subscribers should not, in general, throw. If they do, the EventBus will catch and log the + * exception. This is rarely the right solution for error handling and should not be relied upon; it + * is intended solely to help find problems during development. + * + *

The EventBus guarantees that it will not call a subscriber method from multiple threads + * simultaneously, unless the method explicitly allows it by bearing the {@link + * AllowConcurrentEvents} annotation. If this annotation is not present, subscriber methods need not + * worry about being reentrant, unless also called from outside the EventBus. + * + *

Dead Events

+ * + *

If an event is posted, but no registered subscribers can accept it, it is considered "dead." + * To give the system a second chance to handle dead events, they are wrapped in an instance of + * {@link DeadEvent} and reposted. + * + *

If a subscriber for a supertype of all events (such as Object) is registered, no event will + * ever be considered dead, and no DeadEvents will be generated. Accordingly, while DeadEvent + * extends {@link Object}, a subscriber registered to receive any Object will never receive a + * DeadEvent. + * + *

This class is safe for concurrent use. + * + *

See the Guava User Guide article on {@code EventBus}. + * + */ +public class EventBus { + + private static final Logger logger = Logger.getLogger(EventBus.class.getName()); + + private final String identifier; + + private final Executor executor; + + private final SubscriberExceptionHandler exceptionHandler; + + private final SubscriberRegistry subscribers; + + private final Dispatcher dispatcher; + + public EventBus() { + this("default"); + } + + /** + * Creates a new EventBus with the given {@code identifier}. + * + * @param identifier a brief name for this bus, for logging purposes. Should be a valid Java + * identifier. + */ + public EventBus(String identifier) { + this(identifier, + Runnable::run, + Dispatcher.perThreadDispatchQueue(), + LoggingHandler.INSTANCE); + } + + /** + * Creates a new EventBus with the given {@link SubscriberExceptionHandler}. + * + * @param exceptionHandler Handler for subscriber exceptions. + */ + public EventBus(SubscriberExceptionHandler exceptionHandler) { + this("default", + Runnable::run, + Dispatcher.perThreadDispatchQueue(), + exceptionHandler); + } + + public EventBus(String identifier, + Executor executor, + Dispatcher dispatcher, + SubscriberExceptionHandler exceptionHandler) { + this.identifier = Objects.requireNonNull(identifier); + this.executor = Objects.requireNonNull(executor); + this.dispatcher = Objects.requireNonNull(dispatcher); + this.exceptionHandler = Objects.requireNonNull(exceptionHandler); + this.subscribers = new SubscriberRegistry(this); + } + + /** + * Returns the identifier for this event bus. + */ + public final String identifier() { + return identifier; + } + + /** Returns the default executor this event bus uses for dispatching events to subscribers. */ + final Executor executor() { + return executor; + } + + /** Handles the given exception thrown by a subscriber with the given context. */ + void handleSubscriberException(Throwable e, SubscriberExceptionContext context) { + Objects.requireNonNull(e); + Objects.requireNonNull(context); + try { + exceptionHandler.handleException(e, context); + } catch (Throwable e2) { + // if the handler threw an exception... well, just log it + logger.log(Level.SEVERE, + String.format(Locale.ROOT, "Exception %s thrown while handling exception: %s", e2, e), + e2); + } + } + + /** + * Registers all subscriber methods on {@code object} to receive events. + * + * @param object object whose subscriber methods should be registered. + */ + public void register(Object object) { + subscribers.register(object); + } + + /** + * Unregisters all subscriber methods on a registered {@code object}. + * + * @param object object whose subscriber methods should be unregistered. + * @throws IllegalArgumentException if the object was not previously registered. + */ + public void unregister(Object object) { + subscribers.unregister(object); + } + + /** + * Posts an event to all registered subscribers. This method will return successfully after the + * event has been posted to all subscribers, and regardless of any exceptions thrown by + * subscribers. + * + *

If no subscribers have been subscribed for {@code event}'s class, and {@code event} is not + * already a {@link DeadEvent}, it will be wrapped in a DeadEvent and reposted. + * + * @param event event to post. + */ + public void post(Object event) { + Iterator eventSubscribers = subscribers.getSubscribers(event); + if (eventSubscribers.hasNext()) { + dispatcher.dispatch(event, eventSubscribers); + } else if (!(event instanceof DeadEvent)) { + // the event had no subscribers and was not itself a DeadEvent + post(new DeadEvent(this, event)); + } + } + + @Override + public String toString() { + return "EventBus{" + identifier + "}"; + } + + /** Simple logging handler for subscriber exceptions. */ + static final class LoggingHandler implements SubscriberExceptionHandler { + static final LoggingHandler INSTANCE = new LoggingHandler(); + + @Override + public void handleException(Throwable exception, SubscriberExceptionContext context) { + Logger logger = logger(context); + if (logger.isLoggable(Level.SEVERE)) { + logger.log(Level.SEVERE, message(context), exception); + } + } + + private static Logger logger(SubscriberExceptionContext context) { + return Logger.getLogger(EventBus.class.getName() + "." + context.getEventBus().identifier()); + } + + private static String message(SubscriberExceptionContext context) { + Method method = context.getSubscriberMethod(); + return "Exception thrown by subscriber method " + + method.getName() + + '(' + + method.getParameterTypes()[0].getName() + + ')' + + " on subscriber " + + context.getSubscriber() + + " when dispatching event: " + + context.getEvent(); + } + } +} diff --git a/src/main/java/org/xbib/event/bus/Subscribe.java b/src/main/java/org/xbib/event/bus/Subscribe.java new file mode 100644 index 0000000..e3d34b3 --- /dev/null +++ b/src/main/java/org/xbib/event/bus/Subscribe.java @@ -0,0 +1,22 @@ +package org.xbib.event.bus; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Marks a method as an event subscriber. + * + *

The type of event will be indicated by the method's first (and only) parameter, which cannot + * be primitive. If this annotation is applied to methods with zero parameters, or more than one + * parameter, the object containing the method will not be able to register for event delivery from + * the {@link EventBus}. + * + *

Unless also annotated with @{@link AllowConcurrentEvents}, event subscriber methods will be + * invoked serially by each event bus that they are registered with. + * + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +public @interface Subscribe {} diff --git a/src/main/java/org/xbib/event/bus/Subscriber.java b/src/main/java/org/xbib/event/bus/Subscriber.java new file mode 100644 index 0000000..517ee5b --- /dev/null +++ b/src/main/java/org/xbib/event/bus/Subscriber.java @@ -0,0 +1,122 @@ +package org.xbib.event.bus; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Objects; +import java.util.concurrent.Executor; + +/** + * A subscriber method on a specific object, plus the executor that should be used for dispatching + * events to it. + * + *

Two subscribers are equivalent when they refer to the same method on the same object (not + * class). This property is used to ensure that no subscriber method is registered more than once. + * + */ +public class Subscriber { + + /** Creates a {@code Subscriber} for {@code method} on {@code listener}. */ + static Subscriber create(EventBus bus, Object listener, Method method) { + return isDeclaredThreadSafe(method) + ? new Subscriber(bus, listener, method) + : new SynchronizedSubscriber(bus, listener, method); + } + + /** The event bus this subscriber belongs to. */ + private final EventBus bus; + + /** The object with the subscriber method. */ + final Object target; + + /** Subscriber method. */ + private final Method method; + + /** Executor to use for dispatching events to this subscriber. */ + private final Executor executor; + + private Subscriber(EventBus bus, Object target, Method method) { + this.bus = bus; + this.target = Objects.requireNonNull(target); + this.method = method; + method.setAccessible(true); + this.executor = bus.executor(); + } + + /** Dispatches {@code event} to this subscriber using the proper executor. */ + final void dispatchEvent(final Object event) { + executor.execute(() -> { + try { + invokeSubscriberMethod(event); + } catch (InvocationTargetException e) { + bus.handleSubscriberException(e.getCause(), context(event)); + } + }); + } + + /** + * Invokes the subscriber method. This method can be overridden to make the invocation + * synchronized. + */ + void invokeSubscriberMethod(Object event) throws InvocationTargetException { + try { + method.invoke(target, Objects.requireNonNull(event)); + } catch (IllegalArgumentException e) { + throw new Error("Method rejected target/argument: " + event, e); + } catch (IllegalAccessException e) { + throw new Error("Method became inaccessible: " + event, e); + } catch (InvocationTargetException e) { + if (e.getCause() instanceof Error) { + throw (Error) e.getCause(); + } + throw e; + } + } + + /** Gets the context for the given event. */ + private SubscriberExceptionContext context(Object event) { + return new SubscriberExceptionContext(bus, event, target, method); + } + + @Override + public final int hashCode() { + return (31 + method.hashCode()) * 31 + System.identityHashCode(target); + } + + @Override + public final boolean equals(Object obj) { + if (obj instanceof Subscriber) { + Subscriber that = (Subscriber) obj; + // Use == so that different equal instances will still receive events. + // We only guard against the case that the same object is registered + // multiple times + return target == that.target && method.equals(that.method); + } + return false; + } + + /** + * Checks whether {@code method} is thread-safe, as indicated by the presence of the {@link + * AllowConcurrentEvents} annotation. + */ + private static boolean isDeclaredThreadSafe(Method method) { + return method.getAnnotation(AllowConcurrentEvents.class) != null; + } + + /** + * Subscriber that synchronizes invocations of a method to ensure that only one thread may enter + * the method at a time. + */ + static final class SynchronizedSubscriber extends Subscriber { + + private SynchronizedSubscriber(EventBus bus, Object target, Method method) { + super(bus, target, method); + } + + @Override + void invokeSubscriberMethod(Object event) throws InvocationTargetException { + synchronized (this) { + super.invokeSubscriberMethod(event); + } + } + } +} diff --git a/src/main/java/org/xbib/event/bus/SubscriberExceptionContext.java b/src/main/java/org/xbib/event/bus/SubscriberExceptionContext.java new file mode 100644 index 0000000..d44689b --- /dev/null +++ b/src/main/java/org/xbib/event/bus/SubscriberExceptionContext.java @@ -0,0 +1,51 @@ +package org.xbib.event.bus; + +import java.lang.reflect.Method; +import java.util.Objects; + +/** + * Context for an exception thrown by a subscriber. + */ +public class SubscriberExceptionContext { + private final EventBus eventBus; + private final Object event; + private final Object subscriber; + private final Method subscriberMethod; + + /** + * @param eventBus The {@link EventBus} that handled the event and the subscriber. Useful for + * broadcasting a a new event based on the error. + * @param event The event object that caused the subscriber to throw. + * @param subscriber The source subscriber context. + * @param subscriberMethod the subscribed method. + */ + SubscriberExceptionContext(EventBus eventBus, Object event, Object subscriber, Method subscriberMethod) { + this.eventBus = Objects.requireNonNull(eventBus); + this.event = Objects.requireNonNull(event); + this.subscriber = Objects.requireNonNull(subscriber); + this.subscriberMethod = Objects.requireNonNull(subscriberMethod); + } + + /** + * @return The {@link EventBus} that handled the event and the subscriber. Useful for broadcasting + * a a new event based on the error. + */ + public EventBus getEventBus() { + return eventBus; + } + + /** @return The event object that caused the subscriber to throw. */ + public Object getEvent() { + return event; + } + + /** @return The object context that the subscriber was called on. */ + public Object getSubscriber() { + return subscriber; + } + + /** @return The subscribed method that threw the exception. */ + public Method getSubscriberMethod() { + return subscriberMethod; + } +} diff --git a/src/main/java/org/xbib/event/bus/SubscriberExceptionHandler.java b/src/main/java/org/xbib/event/bus/SubscriberExceptionHandler.java new file mode 100644 index 0000000..8114927 --- /dev/null +++ b/src/main/java/org/xbib/event/bus/SubscriberExceptionHandler.java @@ -0,0 +1,9 @@ +package org.xbib.event.bus; + +/** + * Handler for exceptions thrown by event subscribers. + */ +public interface SubscriberExceptionHandler { + /** Handles exceptions thrown by subscribers. */ + void handleException(Throwable exception, SubscriberExceptionContext context); +} diff --git a/src/main/java/org/xbib/event/bus/SubscriberRegistry.java b/src/main/java/org/xbib/event/bus/SubscriberRegistry.java new file mode 100644 index 0000000..f1f93d8 --- /dev/null +++ b/src/main/java/org/xbib/event/bus/SubscriberRegistry.java @@ -0,0 +1,186 @@ +package org.xbib.event.bus; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArraySet; +import org.xbib.datastructures.common.LinkedHashSetMultiMap; +import org.xbib.datastructures.common.MultiMap; +import org.xbib.event.bus.util.ConcatenatedIterator; +import org.xbib.event.bus.util.TypeToken; + +/** + * Registry of subscribers to a single event bus. + */ +final class SubscriberRegistry { + + /** + * All registered subscribers, indexed by event type. + * + *

The {@link CopyOnWriteArraySet} values make it easy and relatively lightweight to get an + * immutable snapshot of all current subscribers to an event without any locking. + */ + private final ConcurrentMap, CopyOnWriteArraySet> subscribers = + new ConcurrentHashMap<>(); + + /** The event bus this registry belongs to. */ + private final EventBus bus; + + SubscriberRegistry(EventBus bus) { + this.bus = Objects.requireNonNull(bus); + } + + /** Registers all subscriber methods on the given listener object. */ + void register(Object listener) { + + MultiMap, Subscriber> listenerMethods = findAllSubscribers(listener); + for (Entry, Collection> entry : listenerMethods.asMap().entrySet()) { + Class eventType = entry.getKey(); + Collection eventMethodsInListener = entry.getValue(); + CopyOnWriteArraySet eventSubscribers = subscribers.get(eventType); + if (eventSubscribers == null) { + CopyOnWriteArraySet newSet = new CopyOnWriteArraySet<>(); + eventSubscribers = firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet); + } + + eventSubscribers.addAll(eventMethodsInListener); + } + } + + /** Unregisters all subscribers on the given listener object. */ + void unregister(Object listener) { + MultiMap, Subscriber> listenerMethods = findAllSubscribers(listener); + for (Entry, Collection> entry : listenerMethods.asMap().entrySet()) { + Class eventType = entry.getKey(); + Collection listenerMethodsForType = entry.getValue(); + CopyOnWriteArraySet currentSubscribers = subscribers.get(eventType); + if (currentSubscribers == null || !currentSubscribers.removeAll(listenerMethodsForType)) { + // if removeAll returns true, all we really know is that at least one subscriber was + // removed... however, barring something very strange we can assume that if at least one + // subscriber was removed, all subscribers on listener for that event type were... after + // all, the definition of subscribers on a particular class is totally static + throw new IllegalArgumentException( + "missing event subscriber for an annotated method. Is " + listener + " registered?"); + } + + // don't try to remove the set if it's empty; that can't be done safely without a lock + // anyway, if the set is empty it'll just be wrapping an array of length 0 + } + } + + Set getSubscribersForTesting(Class eventType) { + return firstNonNull(subscribers.get(eventType), new LinkedHashSet<>()); + } + + /** + * Gets an iterator representing an immutable snapshot of all subscribers to the given event at + * the time this method is called. + */ + Iterator getSubscribers(Object event) { + Set> eventTypes = flattenHierarchy(event.getClass()); + List> subscriberIterators = new ArrayList<>(eventTypes.size()); + for (Class eventType : eventTypes) { + CopyOnWriteArraySet eventSubscribers = subscribers.get(eventType); + if (eventSubscribers != null) { + subscriberIterators.add(eventSubscribers.iterator()); + } + } + return new ConcatenatedIterator<>(subscriberIterators.iterator()); + } + + /** + * Returns all subscribers for the given listener grouped by the type of event they subscribe to. + */ + private MultiMap, Subscriber> findAllSubscribers(Object listener) { + MultiMap, Subscriber> methodsInListener = new LinkedHashSetMultiMap<>(); + Class clazz = listener.getClass(); + for (Method method : getAnnotatedMethodsNotCached(clazz)) { + Class[] parameterTypes = method.getParameterTypes(); + Class eventType = parameterTypes[0]; + methodsInListener.put(eventType, Subscriber.create(bus, listener, method)); + } + return methodsInListener; + } + + private static Collection getAnnotatedMethodsNotCached(Class clazz) { + Set> supertypes = TypeToken.of(clazz).getTypes().rawTypes(); + Map identifiers = new HashMap<>(); + for (Class supertype : supertypes) { + for (Method method : supertype.getDeclaredMethods()) { + if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) { + Class[] parameterTypes = method.getParameterTypes(); + if (parameterTypes.length != 1) { + throw new IllegalArgumentException(String.format("Method %s has @Subscribe annotation but has %s parameters." + + "Subscriber methods must have exactly 1 parameter.", method, parameterTypes.length)); + } + if (parameterTypes[0].isPrimitive()) { + throw new IllegalArgumentException(String.format("@Subscribe method %s's parameter is %s. " + + "Subscriber methods cannot accept primitives.", + method, + parameterTypes[0].getName())); + } + MethodIdentifier ident = new MethodIdentifier(method); + if (!identifiers.containsKey(ident)) { + identifiers.put(ident, method); + } + } + } + } + return identifiers.values(); + } + + /** + * Flattens a class's type hierarchy into a set of {@code Class} objects including all + * superclasses (transitively) and all interfaces implemented by these superclasses. + */ + @SuppressWarnings("unchecked") + static Set> flattenHierarchy(Class concreteClass) { + return (Set>) TypeToken.of(concreteClass).getTypes().rawTypes(); + } + + private static final class MethodIdentifier { + + private final String name; + private final List> parameterTypes; + + MethodIdentifier(Method method) { + this.name = method.getName(); + this.parameterTypes = Arrays.asList(method.getParameterTypes()); + } + + @Override + public int hashCode() { + return Objects.hash(name, parameterTypes); + } + + @Override + public boolean equals(Object o) { + if (o instanceof MethodIdentifier) { + MethodIdentifier ident = (MethodIdentifier) o; + return name.equals(ident.name) && parameterTypes.equals(ident.parameterTypes); + } + return false; + } + } + + private static T firstNonNull(T first, T second) { + if (first != null) { + return first; + } + if (second != null) { + return second; + } + throw new NullPointerException("Both parameters are null"); + } +} diff --git a/src/main/java/org/xbib/event/bus/util/ConcatenatedIterator.java b/src/main/java/org/xbib/event/bus/util/ConcatenatedIterator.java new file mode 100644 index 0000000..6e4c643 --- /dev/null +++ b/src/main/java/org/xbib/event/bus/util/ConcatenatedIterator.java @@ -0,0 +1,100 @@ +package org.xbib.event.bus.util; + +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.Iterator; +import java.util.NoSuchElementException; +import static java.util.Collections.emptyIterator; + +public class ConcatenatedIterator implements Iterator { + /* The last iterator to return an element. Calls to remove() go to this iterator. */ + private Iterator toRemove; + + /* The iterator currently returning elements. */ + private Iterator iterator; + + /* + * We track the "meta iterators," the iterators-of-iterators, below. Usually, topMetaIterator + * is the only one in use, but if we encounter nested concatenations, we start a deque of + * meta-iterators rather than letting the nesting get arbitrarily deep. This keeps each + * operation O(1). + */ + + private Iterator> topMetaIterator; + + // Only becomes nonnull if we encounter nested concatenations. + private Deque>> metaIterators; + + public ConcatenatedIterator(Iterator> metaIterator) { + iterator = emptyIterator(); + topMetaIterator = metaIterator; + } + + // Returns a nonempty meta-iterator or, if all meta-iterators are empty, null. + private Iterator> getTopMetaIterator() { + while (topMetaIterator == null || !topMetaIterator.hasNext()) { + if (metaIterators != null && !metaIterators.isEmpty()) { + topMetaIterator = metaIterators.removeFirst(); + } else { + return null; + } + } + return topMetaIterator; + } + + @Override + public boolean hasNext() { + while (!iterator.hasNext()) { + // this weird checkNotNull positioning appears required by our tests, which expect + // both hasNext and next to throw NPE if an input iterator is null. + + topMetaIterator = getTopMetaIterator(); + if (topMetaIterator == null) { + return false; + } + + iterator = topMetaIterator.next(); + + if (iterator instanceof ConcatenatedIterator) { + // Instead of taking linear time in the number of nested concatenations, unpack + // them into the queue + @SuppressWarnings("unchecked") + ConcatenatedIterator topConcat = (ConcatenatedIterator) iterator; + iterator = topConcat.iterator; + + // topConcat.topMetaIterator, then topConcat.metaIterators, then this.topMetaIterator, + // then this.metaIterators + + if (this.metaIterators == null) { + this.metaIterators = new ArrayDeque<>(); + } + this.metaIterators.addFirst(this.topMetaIterator); + if (topConcat.metaIterators != null) { + while (!topConcat.metaIterators.isEmpty()) { + this.metaIterators.addFirst(topConcat.metaIterators.removeLast()); + } + } + this.topMetaIterator = topConcat.topMetaIterator; + } + } + return true; + } + + @Override + public T next() { + if (hasNext()) { + toRemove = iterator; + return iterator.next(); + } else { + throw new NoSuchElementException(); + } + } + + @Override + public void remove() { + if (toRemove != null) { + toRemove.remove(); + toRemove = null; + } + } +} diff --git a/src/main/java/org/xbib/event/bus/util/TypeToken.java b/src/main/java/org/xbib/event/bus/util/TypeToken.java new file mode 100644 index 0000000..59098a1 --- /dev/null +++ b/src/main/java/org/xbib/event/bus/util/TypeToken.java @@ -0,0 +1,163 @@ +package org.xbib.event.bus.util; + +import java.lang.reflect.Array; +import java.lang.reflect.GenericArrayType; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.lang.reflect.TypeVariable; +import java.lang.reflect.WildcardType; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +public abstract class TypeToken { + + private final Type runtimeType; + + private TypeToken(Type type) { + this.runtimeType = type; + } + + public static TypeToken of(Class type) { + return new SimpleTypeToken<>(type); + } + + public static TypeToken of(Type type) { + return new SimpleTypeToken<>(type); + } + + public final TypeSet getTypes() { + return new TypeSet(); + } + + public final Class getRawType() { + return getRawTypes().iterator().next(); + } + + private static final class SimpleTypeToken extends TypeToken { + + SimpleTypeToken(Type type) { + super(type); + } + } + + private Collection> getRawTypes() { + final List> builder = new ArrayList<>(); + new TypeVisitor() { + @Override + void visitTypeVariable(TypeVariable t) { + visit(t.getBounds()); + } + + @Override + void visitWildcardType(WildcardType t) { + visit(t.getUpperBounds()); + } + + @Override + void visitParameterizedType(ParameterizedType t) { + builder.add((Class) t.getRawType()); + } + + @Override + void visitClass(Class t) { + builder.add(t); + } + + @Override + void visitGenericArrayType(GenericArrayType t) { + builder.add(getArrayClass(of(t.getGenericComponentType()).getRawType())); + } + }.visit(runtimeType); + @SuppressWarnings({"unchecked", "rawtypes"}) + Collection> result = (Collection) builder; + return result; + } + + private abstract static class TypeCollector { + + static final TypeCollector> FOR_RAW_TYPE = + new TypeCollector<>() { + @Override + Class getRawType(Class type) { + return type; + } + + @Override + Iterable> getInterfaces(Class type) { + return Arrays.asList(type.getInterfaces()); + } + + @Override + Class getSuperclass(Class type) { + return type.getSuperclass(); + } + }; + + + Collection collectTypes(Iterable types) { + Map map = new HashMap<>(); + for (K type : types) { + collectTypes(type, map); + } + @SuppressWarnings("unchecked") + Comparator comparator = (Comparator) Comparator.naturalOrder().reversed(); + return sortKeysByValue(map, comparator); + } + + private int collectTypes(K type, Map map) { + Integer existing = map.get(type); + if (existing != null) { + return existing; + } + int aboveMe = getRawType(type).isInterface() ? 1 : 0; + for (K interfaceType : getInterfaces(type)) { + aboveMe = Math.max(aboveMe, collectTypes(interfaceType, map)); + } + K superclass = getSuperclass(type); + if (superclass != null) { + aboveMe = Math.max(aboveMe, collectTypes(superclass, map)); + } + map.put(type, aboveMe + 1); + return aboveMe + 1; + } + + private static Collection sortKeysByValue(Map map, + Comparator valueComparator) { + Comparator keyOrdering = + (left, right) -> valueComparator.compare(map.get(left), map.get(right)); + List keys = new ArrayList<>(map.keySet()); + keys.sort(keyOrdering); + return keys; + } + + abstract Class getRawType(K type); + + abstract Iterable getInterfaces(K type); + + abstract K getSuperclass(K type); + + } + + public class TypeSet { + + TypeSet() {} + + public Set> rawTypes() { + @SuppressWarnings({"unchecked", "rawtypes"}) + Collection> collectedTypes = + (Collection) TypeCollector.FOR_RAW_TYPE.collectTypes(getRawTypes()); + return new HashSet<>(collectedTypes); + } + } + + static Class getArrayClass(Class componentType) { + return Array.newInstance(componentType, 0).getClass(); + } +} diff --git a/src/main/java/org/xbib/event/bus/util/TypeVisitor.java b/src/main/java/org/xbib/event/bus/util/TypeVisitor.java new file mode 100644 index 0000000..c4d4cc0 --- /dev/null +++ b/src/main/java/org/xbib/event/bus/util/TypeVisitor.java @@ -0,0 +1,87 @@ +package org.xbib.event.bus.util; + +import java.lang.reflect.GenericArrayType; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import java.lang.reflect.TypeVariable; +import java.lang.reflect.WildcardType; +import java.util.HashSet; +import java.util.Set; + +/** + * Based on what a {@link Type} is, dispatch it to the corresponding {@code visit*} method. By + * default, no recursion is done for type arguments or type bounds. But subclasses can opt to do + * recursion by calling {@link #visit} for any {@code Type} while visitation is in progress. For + * example, this can be used to reject wildcards or type variables contained in a type as in: + * + *

{@code
+ * new TypeVisitor() {
+ *   protected void visitParameterizedType(ParameterizedType t) {
+ *     visit(t.getOwnerType());
+ *     visit(t.getActualTypeArguments());
+ *   }
+ *   protected void visitGenericArrayType(GenericArrayType t) {
+ *     visit(t.getGenericComponentType());
+ *   }
+ *   protected void visitTypeVariable(TypeVariable t) {
+ *     throw new IllegalArgumentException("Cannot contain type variable.");
+ *   }
+ *   protected void visitWildcardType(WildcardType t) {
+ *     throw new IllegalArgumentException("Cannot contain wildcard type.");
+ *   }
+ * }.visit(type);
+ * }
+ * + *

One {@code Type} is visited at most once. The second time the same type is visited, it's + * ignored by {@link #visit}. This avoids infinite recursion caused by recursive type bounds. + * + *

This class is not thread safe. + */ +abstract class TypeVisitor { + + private final Set visited = new HashSet<>(); + + /** + * Visits the given types. Null types are ignored. This allows subclasses to call {@code + * visit(parameterizedType.getOwnerType())} safely without having to check nulls. + */ + public final void visit(Type... types) { + for (Type type : types) { + if (type == null || !visited.add(type)) { + // null owner type, or already visited; + continue; + } + boolean succeeded = false; + try { + if (type instanceof TypeVariable) { + visitTypeVariable((TypeVariable) type); + } else if (type instanceof WildcardType) { + visitWildcardType((WildcardType) type); + } else if (type instanceof ParameterizedType) { + visitParameterizedType((ParameterizedType) type); + } else if (type instanceof Class) { + visitClass((Class) type); + } else if (type instanceof GenericArrayType) { + visitGenericArrayType((GenericArrayType) type); + } else { + throw new AssertionError("Unknown type: " + type); + } + succeeded = true; + } finally { + if (!succeeded) { // When the visitation failed, we don't want to ignore the second. + visited.remove(type); + } + } + } + } + + void visitClass(Class t) {} + + void visitGenericArrayType(GenericArrayType t) {} + + void visitParameterizedType(ParameterizedType t) {} + + void visitTypeVariable(TypeVariable t) {} + + void visitWildcardType(WildcardType t) {} +} diff --git a/src/main/java/org/xbib/event/clock/ClockEventConsumer.java b/src/main/java/org/xbib/event/clock/ClockEventConsumer.java index 1b0d2a7..198d968 100644 --- a/src/main/java/org/xbib/event/clock/ClockEventConsumer.java +++ b/src/main/java/org/xbib/event/clock/ClockEventConsumer.java @@ -1,10 +1,10 @@ package org.xbib.event.clock; -import com.google.common.eventbus.AllowConcurrentEvents; -import com.google.common.eventbus.Subscribe; import org.xbib.event.EventConsumer; import java.util.logging.Logger; +import org.xbib.event.bus.AllowConcurrentEvents; +import org.xbib.event.bus.Subscribe; public class ClockEventConsumer implements EventConsumer { diff --git a/src/main/java/org/xbib/event/clock/ClockEventManager.java b/src/main/java/org/xbib/event/clock/ClockEventManager.java index ea7eabe..301ab5b 100644 --- a/src/main/java/org/xbib/event/clock/ClockEventManager.java +++ b/src/main/java/org/xbib/event/clock/ClockEventManager.java @@ -1,6 +1,6 @@ package org.xbib.event.clock; -import com.google.common.eventbus.EventBus; +import org.xbib.event.bus.EventBus; import org.xbib.settings.Settings; import org.xbib.time.schedule.CronExpression; import org.xbib.time.schedule.CronSchedule; diff --git a/src/main/java/org/xbib/event/clock/ClockEventService.java b/src/main/java/org/xbib/event/clock/ClockEventService.java index a319cff..5fcc651 100644 --- a/src/main/java/org/xbib/event/clock/ClockEventService.java +++ b/src/main/java/org/xbib/event/clock/ClockEventService.java @@ -1,11 +1,10 @@ package org.xbib.event.clock; -import com.google.common.eventbus.EventBus; - import java.time.Instant; import java.util.concurrent.Callable; import java.util.logging.Level; import java.util.logging.Logger; +import org.xbib.event.bus.EventBus; public class ClockEventService implements Callable { diff --git a/src/main/java/org/xbib/event/queue/QueueEvent.java b/src/main/java/org/xbib/event/queue/QueueEvent.java index 4a84797..ef21b86 100644 --- a/src/main/java/org/xbib/event/queue/QueueEvent.java +++ b/src/main/java/org/xbib/event/queue/QueueEvent.java @@ -2,12 +2,6 @@ package org.xbib.event.queue; import org.xbib.event.Event; -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.StandardCopyOption; -import java.nio.file.attribute.FileTime; -import java.time.Instant; import java.util.Map; public class QueueEvent implements Event { @@ -16,6 +10,9 @@ public class QueueEvent implements Event { private Map map; + public QueueEvent() { + } + @Override public void setKey(String key) { this.key = key; diff --git a/src/main/java/org/xbib/event/queue/path/simple/PathQueueEventManager.java b/src/main/java/org/xbib/event/queue/path/simple/PathQueueEventManager.java index d1bea6a..4068b47 100644 --- a/src/main/java/org/xbib/event/queue/path/simple/PathQueueEventManager.java +++ b/src/main/java/org/xbib/event/queue/path/simple/PathQueueEventManager.java @@ -1,8 +1,8 @@ package org.xbib.event.queue.path.simple; -import com.google.common.eventbus.EventBus; import org.xbib.datastructures.api.TimeValue; import org.xbib.datastructures.json.tiny.Json; +import org.xbib.event.bus.EventBus; import org.xbib.settings.Settings; import java.io.IOException; diff --git a/src/main/java/org/xbib/event/queue/path/simple/PathQueueEventService.java b/src/main/java/org/xbib/event/queue/path/simple/PathQueueEventService.java index bbf55c0..f6a639d 100644 --- a/src/main/java/org/xbib/event/queue/path/simple/PathQueueEventService.java +++ b/src/main/java/org/xbib/event/queue/path/simple/PathQueueEventService.java @@ -1,6 +1,5 @@ package org.xbib.event.queue.path.simple; -import com.google.common.eventbus.EventBus; import org.xbib.datastructures.json.tiny.Json; import java.io.IOException; @@ -13,6 +12,7 @@ import java.util.LinkedHashMap; import java.util.Objects; import java.util.logging.Level; import java.util.logging.Logger; +import org.xbib.event.bus.EventBus; import static org.xbib.event.queue.path.simple.PathQueueEvent.INCOMING; diff --git a/src/main/java/org/xbib/event/queue/path/watch/PathQueueEventManager.java b/src/main/java/org/xbib/event/queue/path/watch/PathQueueEventManager.java index 2c1c699..7cad8a8 100644 --- a/src/main/java/org/xbib/event/queue/path/watch/PathQueueEventManager.java +++ b/src/main/java/org/xbib/event/queue/path/watch/PathQueueEventManager.java @@ -1,8 +1,8 @@ package org.xbib.event.queue.path.watch; -import com.google.common.eventbus.EventBus; import org.xbib.datastructures.api.TimeValue; import org.xbib.datastructures.json.tiny.Json; +import org.xbib.event.bus.EventBus; import org.xbib.settings.Settings; import java.io.Closeable; diff --git a/src/main/java/org/xbib/event/queue/path/watch/PathQueueEventService.java b/src/main/java/org/xbib/event/queue/path/watch/PathQueueEventService.java index 0e01884..157d3ba 100644 --- a/src/main/java/org/xbib/event/queue/path/watch/PathQueueEventService.java +++ b/src/main/java/org/xbib/event/queue/path/watch/PathQueueEventService.java @@ -1,6 +1,5 @@ package org.xbib.event.queue.path.watch; -import com.google.common.eventbus.EventBus; import org.xbib.datastructures.json.tiny.Json; import java.io.Closeable; @@ -20,6 +19,7 @@ import java.util.Objects; import java.util.concurrent.Callable; import java.util.logging.Level; import java.util.logging.Logger; +import org.xbib.event.bus.EventBus; import static org.xbib.event.queue.path.watch.PathQueueEvent.INCOMING; diff --git a/src/main/java/org/xbib/event/syslog/Facility.java b/src/main/java/org/xbib/event/syslog/Facility.java new file mode 100644 index 0000000..334e4a4 --- /dev/null +++ b/src/main/java/org/xbib/event/syslog/Facility.java @@ -0,0 +1,193 @@ +package org.xbib.event.syslog; + +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; + +/** + * Syslog facility as defined in RFC 5424 - The Syslog Protocol. + * See RFC 5427 - Textual Conventions for Syslog Management for the {@link #label}. + */ +public enum Facility implements Comparable { + + /** + * Kernel messages, numerical code 0. + */ + KERN(0, "KERN"), + /** + * User-level messages, numerical code 1. + */ + USER(1, "USER"), + /** + * Mail system, numerical code 2. + */ + MAIL(2, "MAIL"), + /** + * System daemons, numerical code 3. + */ + DAEMON(3, "DAEMON"), + /** + * Security/authorization messages, numerical code 4. + */ + AUTH(4, "AUTH"), + /** + * Messages generated internally by syslogd, numerical code 5. + */ + SYSLOG(5, "SYSLOG"), + /** + * Line printer subsystem, numerical code 6. + */ + LPR(6, "LPR"), + /** + * Network news subsystem, numerical code 7. + */ + NEWS(7, "NEWS"), + /** + * UUCP subsystem, numerical code 8 + */ + UUCP(8, "UUCP"), + /** + * Clock daemon, numerical code 9. + */ + CRON(9, "CRON"), + /** + * Security/authorization messages, numerical code 10. + */ + AUTHPRIV(10, "AUTHPRIV"), + /** + * FTP daemon, numerical code 11. + */ + FTP(11, "FTP"), + /** + * NTP subsystem, numerical code 12. + */ + NTP(12, "NTP"), + /** + * Log audit, numerical code 13. + */ + AUDIT(13, "AUDIT"), + /** + * Log alert, numerical code 14. + */ + ALERT(14, "ALERT"), + /** + * Clock daemon, numerical code 15. + */ + CLOCK(15, "CLOCK"), + /** + * Reserved for local use, numerical code 16. + */ + LOCAL0(16, "LOCAL0"), + /** + * Reserved for local use, numerical code 17. + */ + LOCAL1(17, "LOCAL1"), + /** + * Reserved for local use, numerical code 18. + */ + LOCAL2(18, "LOCAL2"), + /** + * Reserved for local use, numerical code 19. + */ + LOCAL3(19, "LOCAL3"), + /** + * Reserved for local use, numerical code 20. + */ + LOCAL4(20, "LOCAL4"), + /** + * Reserved for local use, numerical code 21. + */ + LOCAL5(21, "LOCAL5"), + /** + * Reserved for local use, numerical code 22. + */ + LOCAL6(22, "LOCAL6"), + /** + * Reserved for local use, numerical code 23. + */ + LOCAL7(23, "LOCAL7"); + + private final static Map facilityFromLabel = new HashMap(); + + private final static Map facilityFromNumericalCode = new HashMap(); + + static { + for (Facility facility : Facility.values()) { + facilityFromLabel.put(facility.label, facility); + facilityFromNumericalCode.put(facility.numericalCode, facility); + } + } + + /** + * Syslog facility numerical code + */ + private final int numericalCode; + /** + * Syslog facility textual code. Not {@code null} + */ + private final String label; + + private Facility(int numericalCode, String label) { + this.numericalCode = numericalCode; + this.label = label; + } + + /** + * @param numericalCode Syslog facility numerical code + * @return Syslog facility, not {@code null} + * @throws IllegalArgumentException the given numericalCode is not a valid Syslog facility numerical code + */ + public static Facility fromNumericalCode(int numericalCode) throws IllegalArgumentException { + Facility facility = facilityFromNumericalCode.get(numericalCode); + if (facility == null) { + throw new IllegalArgumentException("Invalid facility '" + numericalCode + "'"); + } + return facility; + } + + /** + * @param label Syslog facility textual code. {@code null} or empty returns {@code null} + * @return Syslog facility, {@code null} if given value is {@code null} + * @throws IllegalArgumentException the given value is not a valid Syslog facility textual code + */ + public static Facility fromLabel(String label) throws IllegalArgumentException { + if (label == null || label.isEmpty()) { + return null; + } + + Facility facility = facilityFromLabel.get(label); + if (facility == null) { + throw new IllegalArgumentException("Invalid facility '" + label + "'"); + } + return facility; + } + + /** + * Syslog facility numerical code + * @return numerical code + */ + public int numericalCode() { + return numericalCode; + } + + /** + * Syslog facility textual code. Not {@code null}. + * @return label + */ + public String label() { + return label; + } + + /** + * Compare on {@link Facility#numericalCode()} + * @return comparator for facilities + */ + public static Comparator comparator() { + return new Comparator() { + @Override + public int compare(Facility f1, Facility f2) { + return Integer.compare(f1.numericalCode, f2.numericalCode); + } + }; + } +} diff --git a/src/main/java/org/xbib/event/syslog/JsonParser.java b/src/main/java/org/xbib/event/syslog/JsonParser.java new file mode 100644 index 0000000..c87cbf4 --- /dev/null +++ b/src/main/java/org/xbib/event/syslog/JsonParser.java @@ -0,0 +1,346 @@ +package org.xbib.event.syslog; + +import java.io.IOException; +import java.io.Reader; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +class JsonParser { + + private static final int DEFAULT_BUFFER_SIZE = 1024; + + private final Reader reader; + + private final char[] buf; + + private int index; + + private int fill; + + private int ch; + + private StringBuilder sb; + + private int start; + + public JsonParser(Reader reader) { + this(reader, DEFAULT_BUFFER_SIZE); + } + + public JsonParser(Reader reader, int buffersize) { + this.reader = reader; + buf = new char[buffersize]; + start = -1; + } + + public Object parse() throws IOException { + read(); + skipBlank(); + Object result = parseValue(); + skipBlank(); + if (ch != -1) { + throw new IOException("unexpected character: " + ch); + } + return result; + } + + private Object parseValue() throws IOException { + switch (ch) { + case 'n': + return parseNull(); + case 't': + return parseTrue(); + case 'f': + return parseFalse(); + case '"': + return parseString(); + case '[': + return parseList(); + case '{': + return parseMap(); + case '-': + case '+': + case '0': + case '1': + case '2': + case '3': + case '4': + case '5': + case '6': + case '7': + case '8': + case '9': + return parseNumber(); + } + throw new IOException("value"); + } + + private List parseList() throws IOException { + read(); + List list = new ArrayList(); + skipBlank(); + if (parseChar(']')) { + return list; + } + do { + skipBlank(); + list.add(parseValue()); + skipBlank(); + } while (parseChar(',')); + if (!parseChar(']')) { + expected("',' or ']'"); + } + return list; + } + + private Map parseMap() throws IOException { + read(); + Map object = new LinkedHashMap(); + skipBlank(); + if (parseChar('}')) { + return object; + } + do { + skipBlank(); + if (ch != '"') { + expected("name"); + } + String name = parseString(); + skipBlank(); + if (!parseChar(':')) { + expected("':'"); + } + skipBlank(); + object.put(name, parseValue()); + skipBlank(); + } while (parseChar(',')); + if (!parseChar('}')) { + expected("',' or '}'"); + } + return object; + } + + private Object parseNull() throws IOException { + read(); + checkForChar('u'); + checkForChar('l'); + checkForChar('l'); + return null; + } + + private Object parseTrue() throws IOException { + read(); + checkForChar('r'); + checkForChar('u'); + checkForChar('e'); + return Boolean.TRUE; + } + + private Object parseFalse() throws IOException { + read(); + checkForChar('a'); + checkForChar('l'); + checkForChar('s'); + checkForChar('e'); + return Boolean.FALSE; + } + + private void checkForChar(char ch) throws IOException { + if (!parseChar(ch)) { + expected("'" + ch + "'"); + } + } + + private String parseString() throws IOException { + read(); + startCapture(); + while (ch != '"') { + if (ch == '\\') { + pauseCapture(); + parseEscaped(); + startCapture(); + } else if (ch < 0x20) { + expected("valid string character"); + } else { + read(); + } + } + String s = endCapture(); + read(); + return s; + } + + private void parseEscaped() throws IOException { + read(); + switch (ch) { + case '"': + case '/': + case '\\': + sb.append((char) ch); + break; + case 'b': + sb.append('\b'); + break; + case 't': + sb.append('\t'); + break; + case 'f': + sb.append('\f'); + break; + case 'n': + sb.append('\n'); + break; + case 'r': + sb.append('\r'); + break; + case 'u': + char[] hex = new char[4]; + for (int i = 0; i < 4; i++) { + read(); + if (!isHexDigit()) { + expected("hexadecimal digit"); + } + hex[i] = (char) ch; + } + sb.append((char) Integer.parseInt(String.valueOf(hex), 16)); + break; + default: + expected("valid escape sequence"); + } + read(); + } + + private Object parseNumber() throws IOException { + startCapture(); + parseChar('-'); + int firstDigit = ch; + if (!parseDigit()) { + expected("digit"); + } + if (firstDigit != '0') { + while (parseDigit()) { + } + } + parseFraction(); + parseExponent(); + return endCapture(); + } + + private boolean parseFraction() throws IOException { + if (!parseChar('.')) { + return false; + } + if (!parseDigit()) { + expected("digit"); + } + while (parseDigit()) { + } + return true; + } + + private boolean parseExponent() throws IOException { + if (!parseChar('e') && !parseChar('E')) { + return false; + } + if (!parseChar('+')) { + parseChar('-'); + } + if (!parseDigit()) { + expected("digit"); + } + while (parseDigit()) { + } + return true; + } + + private boolean parseChar(char ch) throws IOException { + if (this.ch != ch) { + return false; + } + read(); + return true; + } + + private boolean parseDigit() throws IOException { + if (!isDigit()) { + return false; + } + read(); + return true; + } + + private void skipBlank() throws IOException { + while (isWhiteSpace()) { + read(); + } + } + + private void read() throws IOException { + if (ch == -1) { + throw new IOException("unexpected end of input"); + } + if (index == fill) { + if (start != -1) { + sb.append(buf, start, fill - start); + start = 0; + } + fill = reader.read(buf, 0, buf.length); + index = 0; + if (fill == -1) { + ch = -1; + return; + } + } + ch = buf[index++]; + } + + private void startCapture() { + if (sb == null) { + sb = new StringBuilder(); + } + start = index - 1; + } + + private void pauseCapture() { + int end = ch == -1 ? index : index - 1; + sb.append(buf, start, end - start); + start = -1; + } + + private String endCapture() { + int end = ch == -1 ? index : index - 1; + String captured; + if (sb.length() > 0) { + sb.append(buf, start, end - start); + captured = sb.toString(); + sb.setLength(0); + } else { + captured = new String(buf, start, end - start); + } + start = -1; + return captured; + } + + private boolean isWhiteSpace() { + return ch == ' ' || ch == '\t' || ch == '\n' || ch == '\r'; + } + + private boolean isDigit() { + return ch >= '0' && ch <= '9'; + } + + private boolean isHexDigit() { + return ch >= '0' && ch <= '9' + || ch >= 'a' && ch <= 'f' + || ch >= 'A' && ch <= 'F'; + } + + private void expected(String expected) throws IOException { + if (ch == -1) { + throw new IOException("unexpected end of input"); + } + throw new IOException("expected " + expected); + } +} diff --git a/src/main/java/org/xbib/event/syslog/MessageParser.java b/src/main/java/org/xbib/event/syslog/MessageParser.java new file mode 100644 index 0000000..973856a --- /dev/null +++ b/src/main/java/org/xbib/event/syslog/MessageParser.java @@ -0,0 +1,231 @@ +package org.xbib.event.syslog; + +import java.io.IOException; +import java.io.StringReader; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; +import java.time.temporal.ChronoUnit; +import java.time.temporal.TemporalAccessor; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import org.xbib.datastructures.api.Builder; + +/** + * Parses a syslog message with RFC 3164 or RFC 5424 date format + */ +public class MessageParser { + + private final static Pattern TWO_SPACES = Pattern.compile(" "); + + private final static DateTimeFormatter formatter = DateTimeFormatter.ISO_DATE_TIME; + + private final static DateTimeFormatter rfc3164Format = + DateTimeFormatter.ofPattern("MMM d HH:mm:ss").withZone(ZoneId.of("UTC")); + + private final static int RFC3164_LEN = 15; + + private final static int RFC5424_PREFIX_LEN = 19; + + private final DateTimeFormatter timeParser; + + private final Map timestampCache; + + private final Map fieldNames = new HashMap<>() {{ + put("host", "host"); + put("facility", "facility"); + put("severity", "severity"); + put("timestamp", "timestamp"); + put("message", "message"); + }}; + + private Map patterns; + + public MessageParser() { + timeParser = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss").withZone(ZoneId.of("UTC")); + timestampCache = new LinkedHashMap<>(); + /*CacheBuilder.newBuilder().maximumSize(1000).build( + new CacheLoader<>() { + + @Override + public LocalDateTime load(String key) { + return LocalDateTime.parse(key, timeParser); + } + });*/ + } + + public MessageParser setPatterns(Map patterns) { + this.patterns = patterns; + return this; + } + + public MessageParser setFieldName(String name, String newName) { + fieldNames.put(name, newName); + return this; + } + + @SuppressWarnings("unchecked") + public void parseMessage(String msg, Builder builder) throws IOException { + int msgLen = msg.length(); + int pos = 0; + if (msg.charAt(pos) != '<') { + throw new IllegalArgumentException("bad format: invalid priority: cannot find open bracket '<' " + msg); + } + int end = msg.indexOf('>'); + if (end < 0 || end > 6) { + throw new IllegalArgumentException("bad format: invalid priority: cannot find end bracket '>' " + msg); + } + int pri = Integer.parseInt(msg.substring(1, end)); + Facility facility = Facility.fromNumericalCode(pri / 8); + Severity severity = Severity.fromNumericalCode(pri % 8); + builder.field(fieldNames.get("facility"), facility.label()) + .field(fieldNames.get("severity"), severity.label()); + if (msgLen <= end + 1) { + throw new IllegalArgumentException("bad format: no data except priority " + msg); + } + pos = end + 1; + if (msgLen > pos + 2 && "1 ".equals(msg.substring(pos, pos + 2))) { + pos += 2; + } + TemporalAccessor timestamp; + char ch = msg.charAt(pos); + if (ch == '-') { + timestamp = LocalDateTime.now(); + if (msgLen <= pos + 2) { + throw new IllegalArgumentException("bad syslog format (missing hostname)"); + } + pos += 2; + } else if (ch >= 'A' && ch <= 'Z') { + if (msgLen <= pos + RFC3164_LEN) { + throw new IllegalArgumentException("bad timestamp format"); + } + timestamp = parseRFC3164Time(msg.substring(pos, pos + RFC3164_LEN)); + pos += RFC3164_LEN + 1; + } else { + int sp = msg.indexOf(' ', pos); + if (sp == -1) { + throw new IllegalArgumentException("bad timestamp format"); + } + timestamp = parseRFC5424Date(msg.substring(pos, sp)); + pos = sp + 1; + } + builder.field(fieldNames.get("timestamp"), formatter.format(timestamp)); + int ns = msg.indexOf(' ', pos); + if (ns == -1) { + throw new IllegalArgumentException("bad syslog format (missing hostname)"); + } + String hostname = msg.substring(pos, ns); + builder.field(fieldNames.get("host"), hostname); + + String data; + if (msgLen > ns + 1) { + pos = ns + 1; + data = msg.substring(pos); + } else { + data = msg; + } + try { + if (data.startsWith("@cee:")) { + data = data.substring(5); + } + JsonParser parser = new JsonParser(new StringReader(data)); + Map map = (Map)parser.parse(); + builder.buildMap(map); + } catch (Throwable t) { + // ignore + } + String message = fieldNames.get("message"); + builder.field(message, data); + if (patterns != null) { + for (Map.Entry entry : patterns.entrySet()) { + Matcher m = entry.getValue().matcher(data); + if (m.find()) { + builder.field(entry.getKey(), m.group(1)); + } + } + } + } + + private LocalDateTime parseRFC5424Date(String msg) { + int len = msg.length(); + if (len <= RFC5424_PREFIX_LEN) { + throw new IllegalArgumentException("bad format: not a valid RFC5424 timestamp: " + msg); + } + String timestampPrefix = msg.substring(0, RFC5424_PREFIX_LEN); + LocalDateTime timestamp = timestampCache.get(timestampPrefix); + int pos = RFC5424_PREFIX_LEN; + if (timestamp == null) { + throw new IllegalArgumentException("parse error: timestamp is null"); + } + if (msg.charAt(pos) == '.') { + boolean found = false; + int end = pos + 1; + if (len <= end) { + throw new IllegalArgumentException("bad timestamp format (no TZ)"); + } + while (!found) { + char ch = msg.charAt(end); + if (ch >= '0' && ch <= '9') { + end++; + } else { + found = true; + } + } + if (end - (pos + 1) > 0) { + long milliseconds = (long) (Double.parseDouble(msg.substring(pos, end)) * 1000.0); + timestamp.plus(milliseconds, ChronoUnit.MILLIS); + } else { + throw new IllegalArgumentException("bad format: invalid timestamp (fractional portion): " + msg); + } + pos = end; + } + char ch = msg.charAt(pos); + if (ch != 'Z') { + if (ch == '+' || ch == '-') { + if (len <= pos + 5) { + throw new IllegalArgumentException("bad format: invalid timezone: " + msg); + } + int sign = ch == '+' ? +1 : -1; + char[] hourzone = new char[5]; + for (int i = 0; i < 5; i++) { + hourzone[i] = msg.charAt(pos + 1 + i); + } + if (hourzone[0] >= '0' && hourzone[0] <= '9' + && hourzone[1] >= '0' && hourzone[1] <= '9' + && hourzone[2] == ':' + && hourzone[3] >= '0' && hourzone[3] <= '9' + && hourzone[4] >= '0' && hourzone[4] <= '9') { + int hourOffset = Integer.parseInt(msg.substring(pos + 1, pos + 3)); + int minOffset = Integer.parseInt(msg.substring(pos + 4, pos + 6)); + timestamp.minus(sign * ((hourOffset * 60L) + minOffset) * 60000, ChronoUnit.MILLIS); + } else { + throw new IllegalArgumentException("bad format: invalid timezone: " + msg); + } + } + } + return timestamp; + } + + private LocalDateTime parseRFC3164Time(String timestamp) { + LocalDateTime now = LocalDateTime.now(); + int year = now.getYear(); + timestamp = TWO_SPACES.matcher(timestamp).replaceFirst(" "); + LocalDateTime date; + try { + date = LocalDateTime.parse(timestamp, rfc3164Format); + } catch (Exception e) { + return LocalDateTime.MIN; + } + LocalDateTime fixed = date.withYear(year); + if (fixed.isAfter(now) && fixed.minusMonths(1).isAfter(now)) { + fixed = date.withYear(year - 1); + } else if (fixed.isBefore(now) && fixed.plusMonths(1).isBefore(now)) { + fixed = date.withYear(year + 1); + } + return fixed; + } + +} diff --git a/src/main/java/org/xbib/event/syslog/Severity.java b/src/main/java/org/xbib/event/syslog/Severity.java new file mode 100644 index 0000000..d851da2 --- /dev/null +++ b/src/main/java/org/xbib/event/syslog/Severity.java @@ -0,0 +1,123 @@ +package org.xbib.event.syslog; + +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; + +/** + * Syslog severity as defined in RFC 5424 - The Syslog Protocol. + */ +public enum Severity { + /** + * Emergency: system is unusable, numerical code 0. + */ + EMERGENCY(0, "EMERGENCY"), + /** + * Alert: action must be taken immediately, numerical code 1. + */ + ALERT(1, "ALERT"), + /** + * Critical: critical conditions, numerical code 2. + */ + CRITICAL(2, "CRITICAL"), + /** + * Error: error conditions, numerical code 3. + */ + ERROR(3, "ERROR"), + /** + * Warning: warning conditions, numerical code 4. + */ + WARNING(4, "WARNING"), + /** + * Notice: normal but significant condition, numerical code 5. + */ + NOTICE(5, "NOTICE"), + /** + * Informational: informational messages, numerical code 6. + */ + INFORMATIONAL(6, "INFORMATIONAL"), + /** + * Debug: debug-level messages, numerical code 7. + */ + DEBUG(7, "DEBUG"); + + private final static Map severityFromLabel = new HashMap(); + + private final static Map severityFromNumericalCode = new HashMap(); + + static { + for (Severity severity : Severity.values()) { + severityFromLabel.put(severity.label, severity); + severityFromNumericalCode.put(severity.numericalCode, severity); + } + } + + private final int numericalCode; + + private final String label; + + private Severity(int numericalCode, String label) { + this.numericalCode = numericalCode; + this.label = label; + } + + /** + * @param numericalCode Syslog severity numerical code + * @return Syslog severity, not {@code null} + * @throws IllegalArgumentException the given numericalCode is not a valid Syslog severity numerical code + */ + public static Severity fromNumericalCode(int numericalCode) throws IllegalArgumentException { + Severity severity = severityFromNumericalCode.get(numericalCode); + if (severity == null) { + throw new IllegalArgumentException("Invalid severity '" + numericalCode + "'"); + } + return severity; + } + + /** + * @param label Syslog severity textual code. {@code null} or empty returns {@code null} + * @return Syslog severity, {@code null} if given value is {@code null} + * @throws IllegalArgumentException the given value is not a valid Syslog severity textual code + */ + public static Severity fromLabel(String label) throws IllegalArgumentException { + if (label == null || label.isEmpty()) { + return null; + } + + Severity severity = severityFromLabel.get(label); + if (severity == null) { + throw new IllegalArgumentException("Invalid severity '" + label + "'"); + } + return severity; + } + + /** + * Syslog severity numerical code + * @return numerical code + */ + public int numericalCode() { + return numericalCode; + } + + /** + * Syslog severity textual code. Not {@code null}. + * @return the severity label + */ + public String label() { + return label; + } + + /** + * Compare on {@link Severity#numericalCode()} + * @return comparator for severities + */ + public static Comparator comparator() { + return new Comparator() { + @Override + public int compare(Severity s1, Severity s2) { + return Integer.compare(s1.numericalCode, s2.numericalCode); + } + }; + } +} + diff --git a/src/main/java/org/xbib/event/syslog/SyslogService.java b/src/main/java/org/xbib/event/syslog/SyslogService.java new file mode 100644 index 0000000..36d381c --- /dev/null +++ b/src/main/java/org/xbib/event/syslog/SyslogService.java @@ -0,0 +1,249 @@ +package org.xbib.event.syslog; + +import java.io.IOException; +import java.net.BindException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.time.format.DateTimeFormatter; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Logger; +import java.util.regex.Pattern; +import org.xbib.datastructures.api.Builder; +import org.xbib.datastructures.api.ByteSizeUnit; +import org.xbib.datastructures.api.ByteSizeValue; +import org.xbib.datastructures.api.TimeValue; +import org.xbib.settings.Settings; + +public class SyslogService { +/* + private static final Logger logger = Logger.getLogger(SyslogService.class.getName()); + + private final static String SYSLOG_HOST = "syslog.host"; + + private final static String SYSLOG_PORT = "syslog.port"; + + private final static String SYSLOG_RECEIVE_BUFFER_SIZE = "receive_buffer_size"; + + private final static String SYSLOG_PATTERNS = "patterns"; + + private final static String SYSLOG_FIELD_NAMES = "field_names"; + + private final String host; + + private final String port; + + private final ByteSizeValue receiveBufferSize; + + private final ReceiveBufferSizePredictorFactory receiveBufferSizePredictorFactory; + + private final MessageParser messageParser; + + private DateTimeFormatter formatter; + + private ConnectionlessBootstrap udpBootstrap; + + private ServerBootstrap tcpBootstrap; + + private Channel udpChannel; + + private Channel tcpChannel; + + public SyslogService(Settings settings) { + this.host = settings.get(SYSLOG_HOST, "127.0.0.1"); + this.port = settings.get(SYSLOG_PORT, "9500-9600"); + this.receiveBufferSize = settings.getAsBytesSize(SYSLOG_RECEIVE_BUFFER_SIZE, new ByteSizeValue(10, ByteSizeUnit.MB)); + this.receiveBufferSizePredictorFactory = new FixedReceiveBufferSizePredictorFactory(receiveBufferSize.bytesAsInt()); + Map map = (Map) settings.getAsStructuredMap().get(SYSLOG_PATTERNS); + Map patterns = new HashMap<>(); + if (map != null) { + for (String key : map.keySet()) { + patterns.put(key, Pattern.compile((String) map.get(key))); + } + } + this.messageParser = new MessageParser().setPatterns(patterns); + map = (Map) settings.getAsStructuredMap().get(SYSLOG_FIELD_NAMES); + if (map != null) { + for (String key : map.keySet()) { + messageParser.setFieldName(key, (String) map.get(key)); + } + } + logger.info("syslog server: host [" + host + "], port [" + port + "]"); + } + + protected void doStart() throws Exception { + initializeUDP(); + initializeTCP(); + logger.info("syslog server up"); + } + + protected void doStop() throws ElasticsearchException { + if (udpChannel != null) { + udpChannel.close().awaitUninterruptibly(); + } + if (udpBootstrap != null) { + udpBootstrap.releaseExternalResources(); + } + if (tcpChannel != null) { + tcpChannel.close().awaitUninterruptibly(); + } + if (tcpBootstrap != null) { + tcpBootstrap.releaseExternalResources(); + } + bulkProcessor.close(); + logger.info("syslog server down"); + } + + private void initializeUDP() { + udpBootstrap = new ConnectionlessBootstrap(new NioDatagramChannelFactory( + Executors.newCachedThreadPool(), 4)); + udpBootstrap.setOption("receiveBufferSize", receiveBufferSize.bytesAsInt()); + udpBootstrap.setOption("receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory); + udpBootstrap.setOption("broadcast", "false"); + udpBootstrap.setPipelineFactory(new ChannelPipelineFactory() { + @Override + public ChannelPipeline getPipeline() throws Exception { + return Channels.pipeline(new Handler("udp")); + } + }); + InetAddress address; + try { + address = NetworkUtils.resolveInetAddress(host, null); + } catch (IOException e) { + logger.warn("failed to resolve host {}", e, host); + return; + } + final InetAddress hostAddress = address; + PortsRange portsRange = new PortsRange(port); + final AtomicReference lastException = new AtomicReference<>(); + boolean success = portsRange.iterate(new PortsRange.PortCallback() { + @Override + public boolean onPortNumber(int portNumber) { + try { + udpChannel = udpBootstrap.bind(new InetSocketAddress(hostAddress, portNumber)); + } catch (Exception e) { + lastException.set(e); + return false; + } + return true; + } + }); + if (!success) { + logger.warn("failed to bind to {}/{}", lastException.get(), hostAddress, port); + return; + } + logger.info("UDP listener running, address {}", udpChannel.getLocalAddress()); + } + + private void initializeTCP() { + tcpBootstrap = new ServerBootstrap(new NioServerSocketChannelFactory( + Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), + settings.getAsInt("tcp.worker", 4))); + + tcpBootstrap.setOption("receiveBufferSize", receiveBufferSize.bytesAsInt()); + tcpBootstrap.setOption("receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory); + tcpBootstrap.setOption("reuseAddress", settings.getAsBoolean("tcp.reuse_address", true)); + tcpBootstrap.setOption("tcpNoDelay", settings.getAsBoolean("tcp.no_delay", true)); + tcpBootstrap.setOption("keepAlive", settings.getAsBoolean("tcp.keep_alive", true)); + + tcpBootstrap.setOption("child.receiveBufferSize", receiveBufferSize.bytesAsInt()); + tcpBootstrap.setOption("child.receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory); + tcpBootstrap.setOption("child.reuseAddress", settings.getAsBoolean("tcp.reuse_address", true)); + tcpBootstrap.setOption("child.tcpNoDelay", settings.getAsBoolean("tcp.no_delay", true)); + tcpBootstrap.setOption("child.keepAlive", settings.getAsBoolean("tcp.keep_alive", true)); + + tcpBootstrap.setPipelineFactory(new ChannelPipelineFactory() { + @Override + public ChannelPipeline getPipeline() throws Exception { + return Channels.pipeline(new Handler("tcp")); + } + }); + + InetAddress address; + try { + address = SyslogNetworkUtils.resolveInetAddress(host, null); + } catch (IOException e) { + logger.warn("failed to resolve host {}", e, host); + return; + } + final InetAddress hostAddress = address; + PortsRange portsRange = new PortsRange(port); + final AtomicReference lastException = new AtomicReference<>(); + boolean success = portsRange.iterate(new PortsRange.PortCallback() { + @Override + public boolean onPortNumber(int portNumber) { + try { + tcpChannel = tcpBootstrap.bind(new InetSocketAddress(hostAddress, portNumber)); + } catch (Exception e) { + lastException.set(e); + return false; + } + return true; + } + }); + if (!success) { + logger.warn("failed to bind to {}/{}", lastException.get(), hostAddress, port); + return; + } + logger.info("TCP listener running, address {}", tcpChannel.getLocalAddress()); + } + + class Handler extends SimpleChannelUpstreamHandler { + + private final String protocol; + + Handler(String protocol) { + this.protocol = protocol; + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { + ChannelBuffer buffer = (ChannelBuffer) e.getMessage(); + XContentBuilder builder = jsonBuilder(); + parse(ctx, buffer, builder); + IndexRequest indexRequest = new IndexRequest(isTimeWindow ? formatter.print(new DateTime()) : index) + .type(type) + .opType(IndexRequest.OpType.INDEX) + .source(builder); + try { + bulkProcessor.add(indexRequest); + } catch (Exception e1) { + logger.warn("failed to execute bulk request", e1); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { + if (e.getCause() instanceof BindException) { + // ignore, this happens when we retry binding to several ports, its fine if we fail... + return; + } + logger.warn("failure caught", e.getCause()); + throw new IOException(e.getCause()); + } + + private void parse(ChannelHandlerContext ctx, ChannelBuffer buffer, Builder builder) throws IOException { + SocketAddress localAddress = ctx.getChannel().getLocalAddress(); + SocketAddress remoteAddress = ctx.getChannel().getRemoteAddress(); + ChannelBufferBytesReference ref = new ChannelBufferBytesReference(buffer); + try { + builder.startObject(); + builder.field("protocol", protocol); + if (localAddress != null) { + builder.field("local", localAddress.toString()); + } + if (remoteAddress != null) { + builder.field("remote", remoteAddress.toString()); + } + messageParser.parseMessage(ref.toUtf8(), builder); + builder.endObject(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + } + } +*/ +} diff --git a/src/main/java/org/xbib/event/timer/TimerEventManager.java b/src/main/java/org/xbib/event/timer/TimerEventManager.java index 9f43529..e0e6a1e 100644 --- a/src/main/java/org/xbib/event/timer/TimerEventManager.java +++ b/src/main/java/org/xbib/event/timer/TimerEventManager.java @@ -1,6 +1,6 @@ package org.xbib.event.timer; -import com.google.common.eventbus.EventBus; +import org.xbib.event.bus.EventBus; import org.xbib.event.persistence.FilePersistenceStore; import org.xbib.event.persistence.PersistenceStore; import org.xbib.settings.Settings; diff --git a/src/main/java/org/xbib/event/timer/TimerEventService.java b/src/main/java/org/xbib/event/timer/TimerEventService.java index 2742f16..87e7c40 100644 --- a/src/main/java/org/xbib/event/timer/TimerEventService.java +++ b/src/main/java/org/xbib/event/timer/TimerEventService.java @@ -1,6 +1,6 @@ package org.xbib.event.timer; -import com.google.common.eventbus.EventBus; +import org.xbib.event.bus.EventBus; import org.xbib.event.persistence.PersistenceStore; import java.io.Closeable; diff --git a/src/main/java/org/xbib/event/yield/AsyncQuery.java b/src/main/java/org/xbib/event/yield/AsyncQuery.java index 83cb41b..03dda60 100644 --- a/src/main/java/org/xbib/event/yield/AsyncQuery.java +++ b/src/main/java/org/xbib/event/yield/AsyncQuery.java @@ -23,6 +23,9 @@ import java.util.function.Predicate; */ public abstract class AsyncQuery implements AsyncTraverser { + public AsyncQuery() { + } + /** * Returns an asynchronous sequential ordered query whose elements * are the specified values in data parameter. diff --git a/src/test/java/org/xbib/event/bus/AsyncEventBusTest.java b/src/test/java/org/xbib/event/bus/AsyncEventBusTest.java new file mode 100644 index 0000000..5a7d48c --- /dev/null +++ b/src/test/java/org/xbib/event/bus/AsyncEventBusTest.java @@ -0,0 +1,62 @@ +package org.xbib.event.bus; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Executor; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class AsyncEventBusTest { + + private static final String EVENT = "Hello"; + + private FakeExecutor executor; + + private AsyncEventBus bus; + + @BeforeEach + protected void setUp() { + executor = new FakeExecutor(); + bus = new AsyncEventBus(executor); + } + + @Test + public void testBasicDistribution() { + StringCatcher catcher = new StringCatcher(); + bus.register(catcher); + + // We post the event, but our Executor will not deliver it until instructed. + bus.post(EVENT); + + List events = catcher.getEvents(); + assertTrue(events.isEmpty(), "No events should be delivered synchronously."); + + // Now we find the task in our Executor and explicitly activate it. + List tasks = executor.getTasks(); + assertEquals(1, tasks.size(), "One event dispatch task should be queued."); + + tasks.get(0).run(); + + assertEquals(1, events.size(), "One event should be delivered."); + assertEquals(EVENT, events.get(0), "Correct string should be delivered."); + } + + /** + * An {@link Executor} wanna-be that simply records the tasks it's given. Arguably the Worst + * Executor Ever. + */ + public static class FakeExecutor implements Executor { + List tasks = new ArrayList<>(); + + @Override + public void execute(Runnable task) { + tasks.add(task); + } + + public List getTasks() { + return tasks; + } + } +} diff --git a/src/test/java/org/xbib/event/bus/EventBusTest.java b/src/test/java/org/xbib/event/bus/EventBusTest.java new file mode 100644 index 0000000..5f1420e --- /dev/null +++ b/src/test/java/org/xbib/event/bus/EventBusTest.java @@ -0,0 +1,308 @@ +package org.xbib.event.bus; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.fail; + +public class EventBusTest { + + private static final Logger logger = Logger.getLogger(EventBusTest.class.getName()); + + private static final String EVENT = "Hello"; + + private static final String BUS_IDENTIFIER = "test-bus"; + + private EventBus bus; + + @BeforeEach + public void create() { + bus = new EventBus(BUS_IDENTIFIER); + } + + @Test + public void testPolymorphicDistribution() { + // Three catchers for related types String, Object, and Comparable. + // String isa Object + // String isa Comparable + // Comparable isa Object + StringCatcher stringCatcher = new StringCatcher(); + + final List objectEvents = new ArrayList<>(); + Object objCatcher = + new Object() { + @SuppressWarnings("unused") + @Subscribe + public void eat(Object food) { + objectEvents.add(food); + } + }; + + final List> compEvents = new ArrayList<>(); + Object compCatcher = + new Object() { + @SuppressWarnings("unused") + @Subscribe + public void eat(Comparable food) { + compEvents.add(food); + } + }; + bus.register(stringCatcher); + bus.register(objCatcher); + bus.register(compCatcher); + + // Two additional event types: Object and Comparable (played by Integer) + Object objEvent = new Object(); + Object compEvent = 6; + + bus.post(EVENT); + bus.post(objEvent); + bus.post(compEvent); + + List stringEvents = stringCatcher.getEvents(); + + logger.log(Level.INFO, stringEvents.toString()); + + assertEquals(1, stringEvents.size(), "Only one String should be delivered."); + assertEquals(EVENT, stringEvents.get(0), "Correct string should be delivered."); + + // Check the Catcher... + assertEquals(3, objectEvents.size(), "Three Objects should be delivered."); + assertEquals(EVENT, objectEvents.get(0), "String fixture must be first object delivered."); + assertEquals(objEvent, objectEvents.get(1), "Object fixture must be second object delivered."); + assertEquals(compEvent, objectEvents.get(2), "Comparable fixture must be thirdobject delivered."); + + // Check the Catcher>... + assertEquals(2, compEvents.size(), "Two Comparables should be delivered."); + assertEquals(EVENT, compEvents.get(0), "String fixture must be first comparable delivered."); + assertEquals(compEvent, compEvents.get(1), "Comparable fixture must be second comparable delivered."); + } + + @Test + public void testSubscriberThrowsException() throws Exception { + final RecordingSubscriberExceptionHandler handler = new RecordingSubscriberExceptionHandler(); + final EventBus eventBus = new EventBus(handler); + final RuntimeException exception = + new RuntimeException("but culottes have a tendancy to ride up!"); + final Object subscriber = + new Object() { + @Subscribe + public void throwExceptionOn(String message) { + throw exception; + } + }; + eventBus.register(subscriber); + eventBus.post(EVENT); + + assertNotNull(handler.getContext()); + assertEquals(exception, handler.getException(), "Cause should be available."); + assertEquals(eventBus, handler.getContext().getEventBus(), "EventBus should be available."); + assertEquals(EVENT, handler.getContext().getEvent(), "Event should be available."); + assertEquals(subscriber, handler.getContext().getSubscriber(), "Subscriber should be available."); + assertEquals(subscriber.getClass().getMethod("throwExceptionOn", String.class), + handler.getContext().getSubscriberMethod(),"Method should be available."); + } + + static class RecordingSubscriberExceptionHandler implements SubscriberExceptionHandler { + + private SubscriberExceptionContext context; + + private Throwable exception; + + public RecordingSubscriberExceptionHandler() { + } + + @Override + public void handleException(Throwable exception, SubscriberExceptionContext context) { + this.exception = exception; + this.context = context; + } + + public SubscriberExceptionContext getContext() { + return context; + } + + public Throwable getException() { + return exception; + } + } + + @Test + public void testSubscriberThrowsExceptionHandlerThrowsException() throws Exception { + final EventBus eventBus = new EventBus((exception, context) -> { + throw new RuntimeException(); + }); + final Object subscriber = + new Object() { + @Subscribe + public void throwExceptionOn(String message) { + throw new RuntimeException(); + } + }; + eventBus.register(subscriber); + try { + eventBus.post(EVENT); + } catch (RuntimeException e) { + fail("Exception should not be thrown."); + } + } + + @Test + public void testDeadEventForwarding() { + GhostCatcher catcher = new GhostCatcher(); + bus.register(catcher); + bus.post(EVENT); + List events = catcher.getEvents(); + assertEquals(1, events.size(), "One dead event should be delivered."); + assertEquals(EVENT, events.get(0).getEvent(), "The dead event should wrap the original event."); + } + + static class GhostCatcher { + + private final List events = new ArrayList<>(); + + @Subscribe + public void ohNoesIHaveDied(DeadEvent event) { + events.add(event); + } + + public List getEvents() { + return events; + } + } + + + @Test + public void testMissingSubscribe() { + bus.register(new Object()); + } + + @Test + public void testUnregister() { + StringCatcher catcher1 = new StringCatcher(); + StringCatcher catcher2 = new StringCatcher(); + try { + bus.unregister(catcher1); + fail("Attempting to unregister an unregistered object succeeded"); + } catch (IllegalArgumentException expected) { + // OK. + } + bus.register(catcher1); + bus.post(EVENT); + bus.register(catcher2); + bus.post(EVENT); + List expectedEvents = new ArrayList<>(); + expectedEvents.add(EVENT); + expectedEvents.add(EVENT); + assertEquals(expectedEvents, catcher1.getEvents(), "Two correct events should be delivered."); + assertEquals(List.of(EVENT), catcher2.getEvents(), "One correct event should be delivered."); + bus.unregister(catcher1); + bus.post(EVENT); + assertEquals(expectedEvents, catcher1.getEvents(), "Shouldn't catch any more events when unregistered."); + assertEquals(expectedEvents, catcher2.getEvents(), "Two correct events should be delivered."); + try { + bus.unregister(catcher1); + fail("Attempting to unregister an unregistered object succeeded"); + } catch (IllegalArgumentException expected) { + // OK. + } + bus.unregister(catcher2); + bus.post(EVENT); + assertEquals(expectedEvents, catcher1.getEvents(), "Shouldn't catch any more events when unregistered."); + assertEquals(expectedEvents, catcher2.getEvents(), "Shouldn't catch any more events when unregistered."); + } + + // NOTE: This test will always pass if register() is thread-safe but may also + // pass if it isn't, though this is unlikely. + @Test + public void testRegisterThreadSafety() throws Exception { + List catchers = new CopyOnWriteArrayList<>(); + List> futures = new ArrayList<>(); + ExecutorService executor = Executors.newFixedThreadPool(10); + int numberOfCatchers = 10000; + for (int i = 0; i < numberOfCatchers; i++) { + futures.add(executor.submit(new Registrator(bus, catchers))); + } + for (int i = 0; i < numberOfCatchers; i++) { + futures.get(i).get(); + } + assertEquals(numberOfCatchers, catchers.size(), "Unexpected number of catchers in the list"); + bus.post(EVENT); + List expectedEvents = List.of(EVENT); + for (StringCatcher catcher : catchers) { + assertEquals(expectedEvents, catcher.getEvents(), "One of the registered catchers did not receive an event."); + } + } + + /** Runnable which registers a StringCatcher on an event bus and adds it to a list. */ + private static class Registrator implements Runnable { + private final EventBus bus; + private final List catchers; + + Registrator(EventBus bus, List catchers) { + this.bus = bus; + this.catchers = catchers; + } + + @Override + public void run() { + StringCatcher catcher = new StringCatcher(); + bus.register(catcher); + catchers.add(catcher); + } + } + + @Test + public void testToString() { + EventBus eventBus = new EventBus("a b ; - \" < > / \\ €"); + assertEquals("EventBus{a b ; - \" < > / \\ €}", eventBus.toString()); + } + + /** + * Tests that bridge methods are not subscribed to events. In Java 8, annotations are included on + * the bridge method in addition to the original method, which causes both the original and bridge + * methods to be subscribed (since both are annotated @Subscribe) without specifically checking + * for bridge methods. + */ + @Test + public void testRegistrationWithBridgeMethod() { + final AtomicInteger calls = new AtomicInteger(); + bus.register( + new Callback() { + @Subscribe + @Override + public void call(String s) { + calls.incrementAndGet(); + } + }); + bus.post("hello"); + assertEquals(1, calls.get()); + } + + private interface Callback { + void call(T t); + } + + @Test + public void testPrimitiveSubscribeFails() { + class SubscribesToPrimitive { + @Subscribe + public void toInt(int i) {} + } + try { + bus.register(new SubscribesToPrimitive()); + fail("should have thrown"); + } catch (IllegalArgumentException expected) { + } + } +} diff --git a/src/test/java/org/xbib/event/bus/ReentrantEventsTest.java b/src/test/java/org/xbib/event/bus/ReentrantEventsTest.java new file mode 100644 index 0000000..725c9d9 --- /dev/null +++ b/src/test/java/org/xbib/event/bus/ReentrantEventsTest.java @@ -0,0 +1,74 @@ +package org.xbib.event.bus; + +import java.util.ArrayList; +import java.util.List; +import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Validate that {@link EventBus} behaves carefully when listeners publish their own events. + */ +public class ReentrantEventsTest { + + static final String FIRST = "one"; + static final Double SECOND = 2.0d; + + final EventBus bus = new EventBus(); + + @Test + public void testNoReentrantEvents() { + ReentrantEventsHater hater = new ReentrantEventsHater(); + bus.register(hater); + bus.post(FIRST); + assertEquals(List.of(FIRST, SECOND), hater.eventsReceived, "ReentrantEventHater expected 2 events"); + } + + public class ReentrantEventsHater { + boolean ready = true; + List eventsReceived = new ArrayList<>(); + + @Subscribe + public void listenForStrings(String event) { + eventsReceived.add(event); + ready = false; + try { + bus.post(SECOND); + } finally { + ready = true; + } + } + + @Subscribe + public void listenForDoubles(Double event) { + assertTrue(ready, "I received an event when I wasn't ready!"); + eventsReceived.add(event); + } + } + + @Test + public void testEventOrderingIsPredictable() { + EventProcessor processor = new EventProcessor(); + bus.register(processor); + EventRecorder recorder = new EventRecorder(); + bus.register(recorder); + bus.post(FIRST); + assertEquals(List.of(FIRST, SECOND), recorder.eventsReceived, "EventRecorder expected events in order"); + } + + class EventProcessor { + @Subscribe + public void listenForStrings(String event) { + bus.post(SECOND); + } + } + + static class EventRecorder { + List eventsReceived = new ArrayList<>(); + + @Subscribe + public void listenForEverything(Object event) { + eventsReceived.add(event); + } + } +} diff --git a/src/test/java/org/xbib/event/bus/StringCatcher.java b/src/test/java/org/xbib/event/bus/StringCatcher.java new file mode 100644 index 0000000..fabcc02 --- /dev/null +++ b/src/test/java/org/xbib/event/bus/StringCatcher.java @@ -0,0 +1,30 @@ +package org.xbib.event.bus; + +import java.util.ArrayList; +import java.util.List; +import static org.junit.jupiter.api.Assertions.fail; + +/** + * A simple EventSubscriber mock that records Strings. + * + *

For testing fun, also includes a landmine method that EventBus tests are required not + * to call ({@link #methodWithoutAnnotation(String)}). + * + */ +public class StringCatcher { + + private final List events = new ArrayList<>(); + + @Subscribe + public void hereHaveAString(String string) { + events.add(string); + } + + public void methodWithoutAnnotation(String string) { + fail("Event bus must not call methods without @Subscribe!"); + } + + public List getEvents() { + return events; + } +}