remove guava dependency, moved event bus into org.xbib.event.bus, update to gradle 7.5

This commit is contained in:
Jörg Prante 2022-08-27 19:02:36 +02:00
parent fbca132aeb
commit d917797648
43 changed files with 3028 additions and 124 deletions

View file

@ -11,3 +11,11 @@ by Fernando Miguel Gamboa de Carvalho (fmcarvalho)
branched as of 23 December, 2021 branched as of 23 December, 2021
License: Apache 2.0 License: Apache 2.0
The work in org.xbib.event.bus is taken from Guava
https://github.com/google/guava
as of 27 August, 2022
License: Apache 2.0

View file

@ -7,6 +7,7 @@ wrapper {
gradleVersion = libs.versions.gradle.get() gradleVersion = libs.versions.gradle.get()
distributionType = Wrapper.DistributionType.ALL distributionType = Wrapper.DistributionType.ALL
} }
ext { ext {
user = 'jprante' user = 'jprante'
name = 'event' name = 'event'
@ -32,8 +33,9 @@ apply from: rootProject.file('gradle/publishing/sonatype.gradle')
dependencies { dependencies {
api libs.settings.api api libs.settings.api
implementation libs.guava implementation libs.net
implementation libs.time implementation libs.time
implementation libs.datastructures.common
implementation libs.datastructures.json.tiny implementation libs.datastructures.json.tiny
implementation libs.reactivestreams implementation libs.reactivestreams
testImplementation libs.rxjava3 testImplementation libs.rxjava3

View file

@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-7.4.2-all.zip distributionUrl=https\://services.gradle.org/distributions/gradle-7.5-all.zip
zipStoreBase=GRADLE_USER_HOME zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists zipStorePath=wrapper/dists

6
gradlew vendored
View file

@ -205,6 +205,12 @@ set -- \
org.gradle.wrapper.GradleWrapperMain \ org.gradle.wrapper.GradleWrapperMain \
"$@" "$@"
# Stop when "xargs" is not available.
if ! command -v xargs >/dev/null 2>&1
then
die "xargs is not available"
fi
# Use "xargs" to parse quoted args. # Use "xargs" to parse quoted args.
# #
# With -n1 it outputs one arg per line, with the quotes and backslashes removed. # With -n1 it outputs one arg per line, with the quotes and backslashes removed.

180
gradlew.bat vendored
View file

@ -1,89 +1,91 @@
@rem @rem
@rem Copyright 2015 the original author or authors. @rem Copyright 2015 the original author or authors.
@rem @rem
@rem Licensed under the Apache License, Version 2.0 (the "License"); @rem Licensed under the Apache License, Version 2.0 (the "License");
@rem you may not use this file except in compliance with the License. @rem you may not use this file except in compliance with the License.
@rem You may obtain a copy of the License at @rem You may obtain a copy of the License at
@rem @rem
@rem https://www.apache.org/licenses/LICENSE-2.0 @rem https://www.apache.org/licenses/LICENSE-2.0
@rem @rem
@rem Unless required by applicable law or agreed to in writing, software @rem Unless required by applicable law or agreed to in writing, software
@rem distributed under the License is distributed on an "AS IS" BASIS, @rem distributed under the License is distributed on an "AS IS" BASIS,
@rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@rem See the License for the specific language governing permissions and @rem See the License for the specific language governing permissions and
@rem limitations under the License. @rem limitations under the License.
@rem @rem
@if "%DEBUG%" == "" @echo off @if "%DEBUG%"=="" @echo off
@rem ########################################################################## @rem ##########################################################################
@rem @rem
@rem Gradle startup script for Windows @rem Gradle startup script for Windows
@rem @rem
@rem ########################################################################## @rem ##########################################################################
@rem Set local scope for the variables with windows NT shell @rem Set local scope for the variables with windows NT shell
if "%OS%"=="Windows_NT" setlocal if "%OS%"=="Windows_NT" setlocal
set DIRNAME=%~dp0 set DIRNAME=%~dp0
if "%DIRNAME%" == "" set DIRNAME=. if "%DIRNAME%"=="" set DIRNAME=.
set APP_BASE_NAME=%~n0 set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME% set APP_HOME=%DIRNAME%
@rem Resolve any "." and ".." in APP_HOME to make it shorter. @rem Resolve any "." and ".." in APP_HOME to make it shorter.
for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi
@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. @rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m" set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m"
@rem Find java.exe @rem Find java.exe
if defined JAVA_HOME goto findJavaFromJavaHome if defined JAVA_HOME goto findJavaFromJavaHome
set JAVA_EXE=java.exe set JAVA_EXE=java.exe
%JAVA_EXE% -version >NUL 2>&1 %JAVA_EXE% -version >NUL 2>&1
if "%ERRORLEVEL%" == "0" goto execute if %ERRORLEVEL% equ 0 goto execute
echo. echo.
echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH.
echo. echo.
echo Please set the JAVA_HOME variable in your environment to match the echo Please set the JAVA_HOME variable in your environment to match the
echo location of your Java installation. echo location of your Java installation.
goto fail goto fail
:findJavaFromJavaHome :findJavaFromJavaHome
set JAVA_HOME=%JAVA_HOME:"=% set JAVA_HOME=%JAVA_HOME:"=%
set JAVA_EXE=%JAVA_HOME%/bin/java.exe set JAVA_EXE=%JAVA_HOME%/bin/java.exe
if exist "%JAVA_EXE%" goto execute if exist "%JAVA_EXE%" goto execute
echo. echo.
echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME%
echo. echo.
echo Please set the JAVA_HOME variable in your environment to match the echo Please set the JAVA_HOME variable in your environment to match the
echo location of your Java installation. echo location of your Java installation.
goto fail goto fail
:execute :execute
@rem Setup the command line @rem Setup the command line
set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar
@rem Execute Gradle @rem Execute Gradle
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %* "%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %*
:end :end
@rem End local scope for the variables with windows NT shell @rem End local scope for the variables with windows NT shell
if "%ERRORLEVEL%"=="0" goto mainEnd if %ERRORLEVEL% equ 0 goto mainEnd
:fail :fail
rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of
rem the _cmd.exe /c_ return code! rem the _cmd.exe /c_ return code!
if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 set EXIT_CODE=%ERRORLEVEL%
exit /b 1 if %EXIT_CODE% equ 0 set EXIT_CODE=1
if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE%
:mainEnd exit /b %EXIT_CODE%
if "%OS%"=="Windows_NT" endlocal
:mainEnd
:omega if "%OS%"=="Windows_NT" endlocal
:omega

View file

@ -1,17 +1,22 @@
dependencyResolutionManagement { dependencyResolutionManagement {
versionCatalogs { versionCatalogs {
libs { libs {
version('gradle', '7.4.2') version('gradle', '7.5')
version('groovy', '3.0.10')
version('junit', '5.8.2') version('junit', '5.8.2')
version('datastructures', '1.0.0')
version('net', '3.0.0')
version('content', '5.0.0')
library('junit-jupiter-api', 'org.junit.jupiter', 'junit-jupiter-api').versionRef('junit') library('junit-jupiter-api', 'org.junit.jupiter', 'junit-jupiter-api').versionRef('junit')
library('junit-jupiter-params', 'org.junit.jupiter', 'junit-jupiter-params').versionRef('junit') library('junit-jupiter-params', 'org.junit.jupiter', 'junit-jupiter-params').versionRef('junit')
library('junit-jupiter-engine', 'org.junit.jupiter', 'junit-jupiter-engine').versionRef('junit') library('junit-jupiter-engine', 'org.junit.jupiter', 'junit-jupiter-engine').versionRef('junit')
library('junit4', 'junit', 'junit').version('4.13.2')
library('hamcrest', 'org.hamcrest', 'hamcrest-library').version('2.2') library('hamcrest', 'org.hamcrest', 'hamcrest-library').version('2.2')
library('settings-api', 'org.xbib', 'settings-api').version('4.0.0') library('junit4', 'junit', 'junit').version('4.13.2')
library('guava', 'org.xbib', 'guava').version('30.1') library('settings-api', 'org.xbib', 'settings-api').versionRef('content')
library('time', 'org.xbib', 'time').version('2.1.1') library('net', 'org.xbib', 'net').versionRef('net')
library('datastructures-json-tiny', 'org.xbib', 'datastructures-json-tiny').version('1.0.0') library('datastructures-common', 'org.xbib', 'datastructures-common').versionRef('datastructures')
library('datastructures-json-tiny', 'org.xbib', 'datastructures-json-tiny').versionRef('datastructures')
library('time', 'org.xbib', 'time').version('2.3.0')
library('reactivestreams', 'org.reactivestreams', 'reactive-streams').version('1.0.3') library('reactivestreams', 'org.reactivestreams', 'reactive-streams').version('1.0.3')
library('rxjava3', 'io.reactivex.rxjava3', 'rxjava').version('3.0.3') library('rxjava3', 'io.reactivex.rxjava3', 'rxjava').version('3.0.3')
} }

View file

@ -1,20 +1,18 @@
module org.xbib.event { module org.xbib.event {
exports org.xbib.event; exports org.xbib.event;
exports org.xbib.event.async; exports org.xbib.event.async;
exports org.xbib.event.bus;
exports org.xbib.event.clock; exports org.xbib.event.clock;
exports org.xbib.event.persistence; exports org.xbib.event.persistence;
exports org.xbib.event.queue; exports org.xbib.event.queue;
exports org.xbib.event.queue.path.simple; exports org.xbib.event.syslog;
exports org.xbib.event.queue.path.watch;
exports org.xbib.event.timer;
exports org.xbib.event.yield; exports org.xbib.event.yield;
exports org.xbib.event.yield.async; requires org.xbib.datastructures.api;
exports org.xbib.event.yield.boxes; requires org.xbib.datastructures.common;
exports org.xbib.event.yield.ops;
requires org.xbib.guava;
requires org.xbib.settings.api;
requires org.xbib.time;
requires org.xbib.datastructures.json.tiny; requires org.xbib.datastructures.json.tiny;
requires org.reactivestreams; requires transitive org.xbib.settings.api;
requires org.xbib.time;
requires org.xbib.net;
requires transitive org.reactivestreams;
requires java.logging; requires java.logging;
} }

View file

@ -1,4 +1,7 @@
package org.xbib.event; package org.xbib.event;
public class EventService { public class EventService {
public EventService() {
}
} }

View file

@ -1,4 +1,7 @@
package org.xbib.event; package org.xbib.event;
public class FileFollowEvent { public class FileFollowEvent {
public FileFollowEvent() {
}
} }

View file

@ -27,6 +27,9 @@ public abstract class AbstractAsyncFileReaderLines {
private boolean cancelled = false; private boolean cancelled = false;
public AbstractAsyncFileReaderLines() {
}
protected abstract void onError(Throwable error); protected abstract void onError(Throwable error);
protected abstract void onComplete(); protected abstract void onComplete();

View file

@ -0,0 +1,17 @@
package org.xbib.event.bus;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Marks an event subscriber method as being thread-safe. This annotation indicates that EventBus
* may invoke the event subscriber simultaneously from multiple threads.
*
* <p>This does not mark the method, and so should be used in combination with {@link Subscribe}.
*
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface AllowConcurrentEvents {}

View file

@ -0,0 +1,44 @@
package org.xbib.event.bus;
import java.util.concurrent.Executor;
/**
* An {@link EventBus} that takes the Executor of your choice and uses it to dispatch events,
* allowing dispatch to occur asynchronously.
*/
public class AsyncEventBus extends EventBus {
/**
* Creates a new AsyncEventBus that will use {@code executor} to dispatch events. Assigns {@code
* identifier} as the bus's name for logging purposes.
*
* @param identifier short name for the bus, for logging purposes.
* @param executor Executor to use to dispatch events. It is the caller's responsibility to shut
* down the executor after the last event has been posted to this event bus.
*/
public AsyncEventBus(String identifier, Executor executor) {
super(identifier, executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
}
/**
* Creates a new AsyncEventBus that will use {@code executor} to dispatch events.
*
* @param executor Executor to use to dispatch events. It is the caller's responsibility to shut
* down the executor after the last event has been posted to this event bus.
* @param subscriberExceptionHandler Handler used to handle exceptions thrown from subscribers.
* See {@link SubscriberExceptionHandler} for more information.
*/
public AsyncEventBus(Executor executor, SubscriberExceptionHandler subscriberExceptionHandler) {
super("default", executor, Dispatcher.legacyAsync(), subscriberExceptionHandler);
}
/**
* Creates a new AsyncEventBus that will use {@code executor} to dispatch events.
*
* @param executor Executor to use to dispatch events. It is the caller's responsibility to shut
* down the executor after the last event has been posted to this event bus.
*/
public AsyncEventBus(Executor executor) {
super("default", executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
}
}

View file

@ -0,0 +1,52 @@
package org.xbib.event.bus;
import java.util.Objects;
/**
* Wraps an event that was posted, but which had no subscribers and thus could not be delivered.
*
* <p>Registering a DeadEvent subscriber is useful for debugging or logging, as it can detect
* misconfigurations in a system's event distribution.
*/
public class DeadEvent {
private final Object source;
private final Object event;
/**
* Creates a new DeadEvent.
*
* @param source object broadcasting the DeadEvent (generally the {@link EventBus}).
* @param event the event that could not be delivered.
*/
public DeadEvent(Object source, Object event) {
this.source = Objects.requireNonNull(source);
this.event = Objects.requireNonNull(event);
}
/**
* Returns the object that originated this event (<em>not</em> the object that originated the
* wrapped event). This is generally an {@link EventBus}.
*
* @return the source of this event.
*/
public Object getSource() {
return source;
}
/**
* Returns the wrapped, 'dead' event, which the system was unable to deliver to any registered
* subscriber.
*
* @return the 'dead' event that could not be delivered.
*/
public Object getEvent() {
return event;
}
@Override
public String toString() {
return "DeadEvent{source=" + source + ",event=" + event + "}";
}
}

View file

@ -0,0 +1,177 @@
package org.xbib.event.bus;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* Handler for dispatching events to subscribers, providing different event ordering guarantees that
* make sense for different situations.
*
* <p><b>Note:</b> The dispatcher is orthogonal to the subscriber's {@code Executor}. The dispatcher
* controls the order in which events are dispatched, while the executor controls how (i.e. on which
* thread) the subscriber is actually called when an event is dispatched to it.
*
*/
public abstract class Dispatcher {
private Dispatcher() {
}
/**
* Returns a dispatcher that queues events that are posted reentrantly on a thread that is already
* dispatching an event, guaranteeing that all events posted on a single thread are dispatched to
* all subscribers in the order they are posted.
*
* <p>When all subscribers are dispatched to using a <i>direct</i> executor (which dispatches on
* the same thread that posts the event), this yields a breadth-first dispatch order on each
* thread. That is, all subscribers to a single event A will be called before any subscribers to
* any events B and C that are posted to the event bus by the subscribers to A.
*/
public static Dispatcher perThreadDispatchQueue() {
return new PerThreadQueuedDispatcher();
}
/**
* Returns a dispatcher that queues events that are posted in a single global queue. This behavior
* matches the original behavior of AsyncEventBus exactly, but is otherwise not especially useful.
* For async dispatch, an {@linkplain #immediate() immediate} dispatcher should generally be
* preferable.
*/
public static Dispatcher legacyAsync() {
return new LegacyAsyncDispatcher();
}
/**
* Returns a dispatcher that dispatches events to subscribers immediately as they're posted
* without using an intermediate queue to change the dispatch order. This is effectively a
* depth-first dispatch order, vs. breadth-first when using a queue.
*/
public static Dispatcher immediate() {
return ImmediateDispatcher.INSTANCE;
}
/** Dispatches the given {@code event} to the given {@code subscribers}. */
public abstract void dispatch(Object event, Iterator<Subscriber> subscribers);
/** Implementation of a {@link #perThreadDispatchQueue()} dispatcher. */
private static final class PerThreadQueuedDispatcher extends Dispatcher {
// This dispatcher matches the original dispatch behavior of EventBus.
/** Per-thread queue of events to dispatch. */
private final ThreadLocal<Queue<Event>> queue =
new ThreadLocal<Queue<Event>>() {
@Override
protected Queue<Event> initialValue() {
return new ArrayDeque<>();
}
};
/** Per-thread dispatch state, used to avoid reentrant event dispatching. */
private final ThreadLocal<Boolean> dispatching =
new ThreadLocal<Boolean>() {
@Override
protected Boolean initialValue() {
return false;
}
};
@Override
public void dispatch(Object event, Iterator<Subscriber> subscribers) {
Objects.requireNonNull(event);
Objects.requireNonNull(subscribers);
Queue<Event> queueForThread = queue.get();
queueForThread.offer(new Event(event, subscribers));
if (!dispatching.get()) {
dispatching.set(true);
try {
Event nextEvent;
while ((nextEvent = queueForThread.poll()) != null) {
while (nextEvent.subscribers.hasNext()) {
nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
}
}
} finally {
dispatching.remove();
queue.remove();
}
}
}
private static final class Event {
private final Object event;
private final Iterator<Subscriber> subscribers;
private Event(Object event, Iterator<Subscriber> subscribers) {
this.event = event;
this.subscribers = subscribers;
}
}
}
/** Implementation of a {@link #legacyAsync()} dispatcher. */
private static final class LegacyAsyncDispatcher extends Dispatcher {
// This dispatcher matches the original dispatch behavior of AsyncEventBus.
//
// We can't really make any guarantees about the overall dispatch order for this dispatcher in
// a multithreaded environment for a couple reasons:
//
// 1. Subscribers to events posted on different threads can be interleaved with each other
// freely. (A event on one thread, B event on another could yield any of
// [a1, a2, a3, b1, b2], [a1, b2, a2, a3, b2], [a1, b2, b3, a2, a3], etc.)
// 2. It's possible for subscribers to actually be dispatched to in a different order than they
// were added to the queue. It's easily possible for one thread to take the head of the
// queue, immediately followed by another thread taking the next element in the queue. That
// second thread can then dispatch to the subscriber it took before the first thread does.
//
// All this makes me really wonder if there's any value in queueing here at all. A dispatcher
// that simply loops through the subscribers and dispatches the event to each would actually
// probably provide a stronger order guarantee, though that order would obviously be different
// in some cases.
/** Global event queue. */
private final ConcurrentLinkedQueue<EventWithSubscriber> queue =
new ConcurrentLinkedQueue<>();
@Override
public void dispatch(Object event, Iterator<Subscriber> subscribers) {
Objects.requireNonNull(event);
while (subscribers.hasNext()) {
queue.add(new EventWithSubscriber(event, subscribers.next()));
}
EventWithSubscriber e;
while ((e = queue.poll()) != null) {
e.subscriber.dispatchEvent(e.event);
}
}
private static final class EventWithSubscriber {
private final Object event;
private final Subscriber subscriber;
private EventWithSubscriber(Object event, Subscriber subscriber) {
this.event = event;
this.subscriber = subscriber;
}
}
}
/** Implementation of {@link #immediate()}. */
private static final class ImmediateDispatcher extends Dispatcher {
private static final ImmediateDispatcher INSTANCE = new ImmediateDispatcher();
@Override
public void dispatch(Object event, Iterator<Subscriber> subscribers) {
Objects.requireNonNull(event);
while (subscribers.hasNext()) {
subscribers.next().dispatchEvent(event);
}
}
}
}

View file

@ -0,0 +1,229 @@
package org.xbib.event.bus;
import java.lang.reflect.Method;
import java.util.Iterator;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Dispatches events to listeners, and provides ways for listeners to register themselves.
*
* <p>The EventBus allows publish-subscribe-style communication between components without requiring
* the components to explicitly register with one another (and thus be aware of each other). It is
* designed exclusively to replace traditional Java in-process event distribution using explicit
* registration. It is <em>not</em> a general-purpose publish-subscribe system, nor is it intended
* for interprocess communication.
*
* <h2>Receiving Events</h2>
*
* <p>To receive events, an object should:
*
* <ol>
* <li>Expose a public method, known as the <i>event subscriber</i>, which accepts a single
* argument of the type of event desired;
* <li>Mark it with a {@link Subscribe} annotation;
* <li>Pass itself to an EventBus instance's {@link #register(Object)} method.
* </ol>
*
* <h2>Posting Events</h2>
*
* <p>To post an event, simply provide the event object to the {@link #post(Object)} method. The
* EventBus instance will determine the type of event and route it to all registered listeners.
*
* <p>Events are routed based on their type &mdash; an event will be delivered to any subscriber for
* any type to which the event is <em>assignable.</em> This includes implemented interfaces, all
* superclasses, and all interfaces implemented by superclasses.
*
* <p>When {@code post} is called, all registered subscribers for an event are run in sequence, so
* subscribers should be reasonably quick. If an event may trigger an extended process (such as a
* database load), spawn a thread or queue it for later. (For a convenient way to do this, use an
* {@link AsyncEventBus}.)
*
* <h2>Subscriber Methods</h2>
*
* <p>Event subscriber methods must accept only one argument: the event.
*
* <p>Subscribers should not, in general, throw. If they do, the EventBus will catch and log the
* exception. This is rarely the right solution for error handling and should not be relied upon; it
* is intended solely to help find problems during development.
*
* <p>The EventBus guarantees that it will not call a subscriber method from multiple threads
* simultaneously, unless the method explicitly allows it by bearing the {@link
* AllowConcurrentEvents} annotation. If this annotation is not present, subscriber methods need not
* worry about being reentrant, unless also called from outside the EventBus.
*
* <h2>Dead Events</h2>
*
* <p>If an event is posted, but no registered subscribers can accept it, it is considered "dead."
* To give the system a second chance to handle dead events, they are wrapped in an instance of
* {@link DeadEvent} and reposted.
*
* <p>If a subscriber for a supertype of all events (such as Object) is registered, no event will
* ever be considered dead, and no DeadEvents will be generated. Accordingly, while DeadEvent
* extends {@link Object}, a subscriber registered to receive any Object will never receive a
* DeadEvent.
*
* <p>This class is safe for concurrent use.
*
* <p>See the Guava User Guide article on <a
* href="https://github.com/google/guava/wiki/EventBusExplained">{@code EventBus}</a>.
*
*/
public class EventBus {
private static final Logger logger = Logger.getLogger(EventBus.class.getName());
private final String identifier;
private final Executor executor;
private final SubscriberExceptionHandler exceptionHandler;
private final SubscriberRegistry subscribers;
private final Dispatcher dispatcher;
public EventBus() {
this("default");
}
/**
* Creates a new EventBus with the given {@code identifier}.
*
* @param identifier a brief name for this bus, for logging purposes. Should be a valid Java
* identifier.
*/
public EventBus(String identifier) {
this(identifier,
Runnable::run,
Dispatcher.perThreadDispatchQueue(),
LoggingHandler.INSTANCE);
}
/**
* Creates a new EventBus with the given {@link SubscriberExceptionHandler}.
*
* @param exceptionHandler Handler for subscriber exceptions.
*/
public EventBus(SubscriberExceptionHandler exceptionHandler) {
this("default",
Runnable::run,
Dispatcher.perThreadDispatchQueue(),
exceptionHandler);
}
public EventBus(String identifier,
Executor executor,
Dispatcher dispatcher,
SubscriberExceptionHandler exceptionHandler) {
this.identifier = Objects.requireNonNull(identifier);
this.executor = Objects.requireNonNull(executor);
this.dispatcher = Objects.requireNonNull(dispatcher);
this.exceptionHandler = Objects.requireNonNull(exceptionHandler);
this.subscribers = new SubscriberRegistry(this);
}
/**
* Returns the identifier for this event bus.
*/
public final String identifier() {
return identifier;
}
/** Returns the default executor this event bus uses for dispatching events to subscribers. */
final Executor executor() {
return executor;
}
/** Handles the given exception thrown by a subscriber with the given context. */
void handleSubscriberException(Throwable e, SubscriberExceptionContext context) {
Objects.requireNonNull(e);
Objects.requireNonNull(context);
try {
exceptionHandler.handleException(e, context);
} catch (Throwable e2) {
// if the handler threw an exception... well, just log it
logger.log(Level.SEVERE,
String.format(Locale.ROOT, "Exception %s thrown while handling exception: %s", e2, e),
e2);
}
}
/**
* Registers all subscriber methods on {@code object} to receive events.
*
* @param object object whose subscriber methods should be registered.
*/
public void register(Object object) {
subscribers.register(object);
}
/**
* Unregisters all subscriber methods on a registered {@code object}.
*
* @param object object whose subscriber methods should be unregistered.
* @throws IllegalArgumentException if the object was not previously registered.
*/
public void unregister(Object object) {
subscribers.unregister(object);
}
/**
* Posts an event to all registered subscribers. This method will return successfully after the
* event has been posted to all subscribers, and regardless of any exceptions thrown by
* subscribers.
*
* <p>If no subscribers have been subscribed for {@code event}'s class, and {@code event} is not
* already a {@link DeadEvent}, it will be wrapped in a DeadEvent and reposted.
*
* @param event event to post.
*/
public void post(Object event) {
Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
if (eventSubscribers.hasNext()) {
dispatcher.dispatch(event, eventSubscribers);
} else if (!(event instanceof DeadEvent)) {
// the event had no subscribers and was not itself a DeadEvent
post(new DeadEvent(this, event));
}
}
@Override
public String toString() {
return "EventBus{" + identifier + "}";
}
/** Simple logging handler for subscriber exceptions. */
static final class LoggingHandler implements SubscriberExceptionHandler {
static final LoggingHandler INSTANCE = new LoggingHandler();
@Override
public void handleException(Throwable exception, SubscriberExceptionContext context) {
Logger logger = logger(context);
if (logger.isLoggable(Level.SEVERE)) {
logger.log(Level.SEVERE, message(context), exception);
}
}
private static Logger logger(SubscriberExceptionContext context) {
return Logger.getLogger(EventBus.class.getName() + "." + context.getEventBus().identifier());
}
private static String message(SubscriberExceptionContext context) {
Method method = context.getSubscriberMethod();
return "Exception thrown by subscriber method "
+ method.getName()
+ '('
+ method.getParameterTypes()[0].getName()
+ ')'
+ " on subscriber "
+ context.getSubscriber()
+ " when dispatching event: "
+ context.getEvent();
}
}
}

View file

@ -0,0 +1,22 @@
package org.xbib.event.bus;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Marks a method as an event subscriber.
*
* <p>The type of event will be indicated by the method's first (and only) parameter, which cannot
* be primitive. If this annotation is applied to methods with zero parameters, or more than one
* parameter, the object containing the method will not be able to register for event delivery from
* the {@link EventBus}.
*
* <p>Unless also annotated with @{@link AllowConcurrentEvents}, event subscriber methods will be
* invoked serially by each event bus that they are registered with.
*
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Subscribe {}

View file

@ -0,0 +1,122 @@
package org.xbib.event.bus;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Objects;
import java.util.concurrent.Executor;
/**
* A subscriber method on a specific object, plus the executor that should be used for dispatching
* events to it.
*
* <p>Two subscribers are equivalent when they refer to the same method on the same object (not
* class). This property is used to ensure that no subscriber method is registered more than once.
*
*/
public class Subscriber {
/** Creates a {@code Subscriber} for {@code method} on {@code listener}. */
static Subscriber create(EventBus bus, Object listener, Method method) {
return isDeclaredThreadSafe(method)
? new Subscriber(bus, listener, method)
: new SynchronizedSubscriber(bus, listener, method);
}
/** The event bus this subscriber belongs to. */
private final EventBus bus;
/** The object with the subscriber method. */
final Object target;
/** Subscriber method. */
private final Method method;
/** Executor to use for dispatching events to this subscriber. */
private final Executor executor;
private Subscriber(EventBus bus, Object target, Method method) {
this.bus = bus;
this.target = Objects.requireNonNull(target);
this.method = method;
method.setAccessible(true);
this.executor = bus.executor();
}
/** Dispatches {@code event} to this subscriber using the proper executor. */
final void dispatchEvent(final Object event) {
executor.execute(() -> {
try {
invokeSubscriberMethod(event);
} catch (InvocationTargetException e) {
bus.handleSubscriberException(e.getCause(), context(event));
}
});
}
/**
* Invokes the subscriber method. This method can be overridden to make the invocation
* synchronized.
*/
void invokeSubscriberMethod(Object event) throws InvocationTargetException {
try {
method.invoke(target, Objects.requireNonNull(event));
} catch (IllegalArgumentException e) {
throw new Error("Method rejected target/argument: " + event, e);
} catch (IllegalAccessException e) {
throw new Error("Method became inaccessible: " + event, e);
} catch (InvocationTargetException e) {
if (e.getCause() instanceof Error) {
throw (Error) e.getCause();
}
throw e;
}
}
/** Gets the context for the given event. */
private SubscriberExceptionContext context(Object event) {
return new SubscriberExceptionContext(bus, event, target, method);
}
@Override
public final int hashCode() {
return (31 + method.hashCode()) * 31 + System.identityHashCode(target);
}
@Override
public final boolean equals(Object obj) {
if (obj instanceof Subscriber) {
Subscriber that = (Subscriber) obj;
// Use == so that different equal instances will still receive events.
// We only guard against the case that the same object is registered
// multiple times
return target == that.target && method.equals(that.method);
}
return false;
}
/**
* Checks whether {@code method} is thread-safe, as indicated by the presence of the {@link
* AllowConcurrentEvents} annotation.
*/
private static boolean isDeclaredThreadSafe(Method method) {
return method.getAnnotation(AllowConcurrentEvents.class) != null;
}
/**
* Subscriber that synchronizes invocations of a method to ensure that only one thread may enter
* the method at a time.
*/
static final class SynchronizedSubscriber extends Subscriber {
private SynchronizedSubscriber(EventBus bus, Object target, Method method) {
super(bus, target, method);
}
@Override
void invokeSubscriberMethod(Object event) throws InvocationTargetException {
synchronized (this) {
super.invokeSubscriberMethod(event);
}
}
}
}

View file

@ -0,0 +1,51 @@
package org.xbib.event.bus;
import java.lang.reflect.Method;
import java.util.Objects;
/**
* Context for an exception thrown by a subscriber.
*/
public class SubscriberExceptionContext {
private final EventBus eventBus;
private final Object event;
private final Object subscriber;
private final Method subscriberMethod;
/**
* @param eventBus The {@link EventBus} that handled the event and the subscriber. Useful for
* broadcasting a a new event based on the error.
* @param event The event object that caused the subscriber to throw.
* @param subscriber The source subscriber context.
* @param subscriberMethod the subscribed method.
*/
SubscriberExceptionContext(EventBus eventBus, Object event, Object subscriber, Method subscriberMethod) {
this.eventBus = Objects.requireNonNull(eventBus);
this.event = Objects.requireNonNull(event);
this.subscriber = Objects.requireNonNull(subscriber);
this.subscriberMethod = Objects.requireNonNull(subscriberMethod);
}
/**
* @return The {@link EventBus} that handled the event and the subscriber. Useful for broadcasting
* a a new event based on the error.
*/
public EventBus getEventBus() {
return eventBus;
}
/** @return The event object that caused the subscriber to throw. */
public Object getEvent() {
return event;
}
/** @return The object context that the subscriber was called on. */
public Object getSubscriber() {
return subscriber;
}
/** @return The subscribed method that threw the exception. */
public Method getSubscriberMethod() {
return subscriberMethod;
}
}

View file

@ -0,0 +1,9 @@
package org.xbib.event.bus;
/**
* Handler for exceptions thrown by event subscribers.
*/
public interface SubscriberExceptionHandler {
/** Handles exceptions thrown by subscribers. */
void handleException(Throwable exception, SubscriberExceptionContext context);
}

View file

@ -0,0 +1,186 @@
package org.xbib.event.bus;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import org.xbib.datastructures.common.LinkedHashSetMultiMap;
import org.xbib.datastructures.common.MultiMap;
import org.xbib.event.bus.util.ConcatenatedIterator;
import org.xbib.event.bus.util.TypeToken;
/**
* Registry of subscribers to a single event bus.
*/
final class SubscriberRegistry {
/**
* All registered subscribers, indexed by event type.
*
* <p>The {@link CopyOnWriteArraySet} values make it easy and relatively lightweight to get an
* immutable snapshot of all current subscribers to an event without any locking.
*/
private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
new ConcurrentHashMap<>();
/** The event bus this registry belongs to. */
private final EventBus bus;
SubscriberRegistry(EventBus bus) {
this.bus = Objects.requireNonNull(bus);
}
/** Registers all subscriber methods on the given listener object. */
void register(Object listener) {
MultiMap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
Class<?> eventType = entry.getKey();
Collection<Subscriber> eventMethodsInListener = entry.getValue();
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
if (eventSubscribers == null) {
CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
eventSubscribers = firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
}
eventSubscribers.addAll(eventMethodsInListener);
}
}
/** Unregisters all subscribers on the given listener object. */
void unregister(Object listener) {
MultiMap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
Class<?> eventType = entry.getKey();
Collection<Subscriber> listenerMethodsForType = entry.getValue();
CopyOnWriteArraySet<Subscriber> currentSubscribers = subscribers.get(eventType);
if (currentSubscribers == null || !currentSubscribers.removeAll(listenerMethodsForType)) {
// if removeAll returns true, all we really know is that at least one subscriber was
// removed... however, barring something very strange we can assume that if at least one
// subscriber was removed, all subscribers on listener for that event type were... after
// all, the definition of subscribers on a particular class is totally static
throw new IllegalArgumentException(
"missing event subscriber for an annotated method. Is " + listener + " registered?");
}
// don't try to remove the set if it's empty; that can't be done safely without a lock
// anyway, if the set is empty it'll just be wrapping an array of length 0
}
}
Set<Subscriber> getSubscribersForTesting(Class<?> eventType) {
return firstNonNull(subscribers.get(eventType), new LinkedHashSet<>());
}
/**
* Gets an iterator representing an immutable snapshot of all subscribers to the given event at
* the time this method is called.
*/
Iterator<Subscriber> getSubscribers(Object event) {
Set<Class<?>> eventTypes = flattenHierarchy(event.getClass());
List<Iterator<Subscriber>> subscriberIterators = new ArrayList<>(eventTypes.size());
for (Class<?> eventType : eventTypes) {
CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
if (eventSubscribers != null) {
subscriberIterators.add(eventSubscribers.iterator());
}
}
return new ConcatenatedIterator<>(subscriberIterators.iterator());
}
/**
* Returns all subscribers for the given listener grouped by the type of event they subscribe to.
*/
private MultiMap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
MultiMap<Class<?>, Subscriber> methodsInListener = new LinkedHashSetMultiMap<>();
Class<?> clazz = listener.getClass();
for (Method method : getAnnotatedMethodsNotCached(clazz)) {
Class<?>[] parameterTypes = method.getParameterTypes();
Class<?> eventType = parameterTypes[0];
methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
}
return methodsInListener;
}
private static Collection<Method> getAnnotatedMethodsNotCached(Class<?> clazz) {
Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes();
Map<MethodIdentifier, Method> identifiers = new HashMap<>();
for (Class<?> supertype : supertypes) {
for (Method method : supertype.getDeclaredMethods()) {
if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) {
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length != 1) {
throw new IllegalArgumentException(String.format("Method %s has @Subscribe annotation but has %s parameters."
+ "Subscriber methods must have exactly 1 parameter.", method, parameterTypes.length));
}
if (parameterTypes[0].isPrimitive()) {
throw new IllegalArgumentException(String.format("@Subscribe method %s's parameter is %s. "
+ "Subscriber methods cannot accept primitives.",
method,
parameterTypes[0].getName()));
}
MethodIdentifier ident = new MethodIdentifier(method);
if (!identifiers.containsKey(ident)) {
identifiers.put(ident, method);
}
}
}
}
return identifiers.values();
}
/**
* Flattens a class's type hierarchy into a set of {@code Class} objects including all
* superclasses (transitively) and all interfaces implemented by these superclasses.
*/
@SuppressWarnings("unchecked")
static Set<Class<?>> flattenHierarchy(Class<?> concreteClass) {
return (Set<Class<?>>) TypeToken.of(concreteClass).getTypes().rawTypes();
}
private static final class MethodIdentifier {
private final String name;
private final List<Class<?>> parameterTypes;
MethodIdentifier(Method method) {
this.name = method.getName();
this.parameterTypes = Arrays.asList(method.getParameterTypes());
}
@Override
public int hashCode() {
return Objects.hash(name, parameterTypes);
}
@Override
public boolean equals(Object o) {
if (o instanceof MethodIdentifier) {
MethodIdentifier ident = (MethodIdentifier) o;
return name.equals(ident.name) && parameterTypes.equals(ident.parameterTypes);
}
return false;
}
}
private static <T> T firstNonNull(T first, T second) {
if (first != null) {
return first;
}
if (second != null) {
return second;
}
throw new NullPointerException("Both parameters are null");
}
}

View file

@ -0,0 +1,100 @@
package org.xbib.event.bus.util;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Iterator;
import java.util.NoSuchElementException;
import static java.util.Collections.emptyIterator;
public class ConcatenatedIterator<T> implements Iterator<T> {
/* The last iterator to return an element. Calls to remove() go to this iterator. */
private Iterator<? extends T> toRemove;
/* The iterator currently returning elements. */
private Iterator<? extends T> iterator;
/*
* We track the "meta iterators," the iterators-of-iterators, below. Usually, topMetaIterator
* is the only one in use, but if we encounter nested concatenations, we start a deque of
* meta-iterators rather than letting the nesting get arbitrarily deep. This keeps each
* operation O(1).
*/
private Iterator<? extends Iterator<? extends T>> topMetaIterator;
// Only becomes nonnull if we encounter nested concatenations.
private Deque<Iterator<? extends Iterator<? extends T>>> metaIterators;
public ConcatenatedIterator(Iterator<? extends Iterator<? extends T>> metaIterator) {
iterator = emptyIterator();
topMetaIterator = metaIterator;
}
// Returns a nonempty meta-iterator or, if all meta-iterators are empty, null.
private Iterator<? extends Iterator<? extends T>> getTopMetaIterator() {
while (topMetaIterator == null || !topMetaIterator.hasNext()) {
if (metaIterators != null && !metaIterators.isEmpty()) {
topMetaIterator = metaIterators.removeFirst();
} else {
return null;
}
}
return topMetaIterator;
}
@Override
public boolean hasNext() {
while (!iterator.hasNext()) {
// this weird checkNotNull positioning appears required by our tests, which expect
// both hasNext and next to throw NPE if an input iterator is null.
topMetaIterator = getTopMetaIterator();
if (topMetaIterator == null) {
return false;
}
iterator = topMetaIterator.next();
if (iterator instanceof ConcatenatedIterator) {
// Instead of taking linear time in the number of nested concatenations, unpack
// them into the queue
@SuppressWarnings("unchecked")
ConcatenatedIterator<T> topConcat = (ConcatenatedIterator<T>) iterator;
iterator = topConcat.iterator;
// topConcat.topMetaIterator, then topConcat.metaIterators, then this.topMetaIterator,
// then this.metaIterators
if (this.metaIterators == null) {
this.metaIterators = new ArrayDeque<>();
}
this.metaIterators.addFirst(this.topMetaIterator);
if (topConcat.metaIterators != null) {
while (!topConcat.metaIterators.isEmpty()) {
this.metaIterators.addFirst(topConcat.metaIterators.removeLast());
}
}
this.topMetaIterator = topConcat.topMetaIterator;
}
}
return true;
}
@Override
public T next() {
if (hasNext()) {
toRemove = iterator;
return iterator.next();
} else {
throw new NoSuchElementException();
}
}
@Override
public void remove() {
if (toRemove != null) {
toRemove.remove();
toRemove = null;
}
}
}

View file

@ -0,0 +1,163 @@
package org.xbib.event.bus.util;
import java.lang.reflect.Array;
import java.lang.reflect.GenericArrayType;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
import java.lang.reflect.WildcardType;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
public abstract class TypeToken<T> {
private final Type runtimeType;
private TypeToken(Type type) {
this.runtimeType = type;
}
public static <T> TypeToken<T> of(Class<T> type) {
return new SimpleTypeToken<>(type);
}
public static TypeToken<?> of(Type type) {
return new SimpleTypeToken<>(type);
}
public final TypeSet getTypes() {
return new TypeSet();
}
public final Class<? super T> getRawType() {
return getRawTypes().iterator().next();
}
private static final class SimpleTypeToken<T> extends TypeToken<T> {
SimpleTypeToken(Type type) {
super(type);
}
}
private Collection<Class<? super T>> getRawTypes() {
final List<Class<?>> builder = new ArrayList<>();
new TypeVisitor() {
@Override
void visitTypeVariable(TypeVariable<?> t) {
visit(t.getBounds());
}
@Override
void visitWildcardType(WildcardType t) {
visit(t.getUpperBounds());
}
@Override
void visitParameterizedType(ParameterizedType t) {
builder.add((Class<?>) t.getRawType());
}
@Override
void visitClass(Class<?> t) {
builder.add(t);
}
@Override
void visitGenericArrayType(GenericArrayType t) {
builder.add(getArrayClass(of(t.getGenericComponentType()).getRawType()));
}
}.visit(runtimeType);
@SuppressWarnings({"unchecked", "rawtypes"})
Collection<Class<? super T>> result = (Collection) builder;
return result;
}
private abstract static class TypeCollector<K> {
static final TypeCollector<Class<?>> FOR_RAW_TYPE =
new TypeCollector<>() {
@Override
Class<?> getRawType(Class<?> type) {
return type;
}
@Override
Iterable<? extends Class<?>> getInterfaces(Class<?> type) {
return Arrays.asList(type.getInterfaces());
}
@Override
Class<?> getSuperclass(Class<?> type) {
return type.getSuperclass();
}
};
Collection<K> collectTypes(Iterable<? extends K> types) {
Map<K, Integer> map = new HashMap<>();
for (K type : types) {
collectTypes(type, map);
}
@SuppressWarnings("unchecked")
Comparator<Integer> comparator = (Comparator<Integer>) Comparator.naturalOrder().reversed();
return sortKeysByValue(map, comparator);
}
private int collectTypes(K type, Map<? super K, Integer> map) {
Integer existing = map.get(type);
if (existing != null) {
return existing;
}
int aboveMe = getRawType(type).isInterface() ? 1 : 0;
for (K interfaceType : getInterfaces(type)) {
aboveMe = Math.max(aboveMe, collectTypes(interfaceType, map));
}
K superclass = getSuperclass(type);
if (superclass != null) {
aboveMe = Math.max(aboveMe, collectTypes(superclass, map));
}
map.put(type, aboveMe + 1);
return aboveMe + 1;
}
private static <K, V> Collection<K> sortKeysByValue(Map<K, V> map,
Comparator<? super V> valueComparator) {
Comparator<K> keyOrdering =
(left, right) -> valueComparator.compare(map.get(left), map.get(right));
List<K> keys = new ArrayList<>(map.keySet());
keys.sort(keyOrdering);
return keys;
}
abstract Class<?> getRawType(K type);
abstract Iterable<? extends K> getInterfaces(K type);
abstract K getSuperclass(K type);
}
public class TypeSet {
TypeSet() {}
public Set<Class<? super T>> rawTypes() {
@SuppressWarnings({"unchecked", "rawtypes"})
Collection<Class<? super T>> collectedTypes =
(Collection) TypeCollector.FOR_RAW_TYPE.collectTypes(getRawTypes());
return new HashSet<>(collectedTypes);
}
}
static Class<?> getArrayClass(Class<?> componentType) {
return Array.newInstance(componentType, 0).getClass();
}
}

View file

@ -0,0 +1,87 @@
package org.xbib.event.bus.util;
import java.lang.reflect.GenericArrayType;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.lang.reflect.TypeVariable;
import java.lang.reflect.WildcardType;
import java.util.HashSet;
import java.util.Set;
/**
* Based on what a {@link Type} is, dispatch it to the corresponding {@code visit*} method. By
* default, no recursion is done for type arguments or type bounds. But subclasses can opt to do
* recursion by calling {@link #visit} for any {@code Type} while visitation is in progress. For
* example, this can be used to reject wildcards or type variables contained in a type as in:
*
* <pre>{@code
* new TypeVisitor() {
* protected void visitParameterizedType(ParameterizedType t) {
* visit(t.getOwnerType());
* visit(t.getActualTypeArguments());
* }
* protected void visitGenericArrayType(GenericArrayType t) {
* visit(t.getGenericComponentType());
* }
* protected void visitTypeVariable(TypeVariable<?> t) {
* throw new IllegalArgumentException("Cannot contain type variable.");
* }
* protected void visitWildcardType(WildcardType t) {
* throw new IllegalArgumentException("Cannot contain wildcard type.");
* }
* }.visit(type);
* }</pre>
*
* <p>One {@code Type} is visited at most once. The second time the same type is visited, it's
* ignored by {@link #visit}. This avoids infinite recursion caused by recursive type bounds.
*
* <p>This class is <em>not</em> thread safe.
*/
abstract class TypeVisitor {
private final Set<Type> visited = new HashSet<>();
/**
* Visits the given types. Null types are ignored. This allows subclasses to call {@code
* visit(parameterizedType.getOwnerType())} safely without having to check nulls.
*/
public final void visit(Type... types) {
for (Type type : types) {
if (type == null || !visited.add(type)) {
// null owner type, or already visited;
continue;
}
boolean succeeded = false;
try {
if (type instanceof TypeVariable) {
visitTypeVariable((TypeVariable<?>) type);
} else if (type instanceof WildcardType) {
visitWildcardType((WildcardType) type);
} else if (type instanceof ParameterizedType) {
visitParameterizedType((ParameterizedType) type);
} else if (type instanceof Class) {
visitClass((Class<?>) type);
} else if (type instanceof GenericArrayType) {
visitGenericArrayType((GenericArrayType) type);
} else {
throw new AssertionError("Unknown type: " + type);
}
succeeded = true;
} finally {
if (!succeeded) { // When the visitation failed, we don't want to ignore the second.
visited.remove(type);
}
}
}
}
void visitClass(Class<?> t) {}
void visitGenericArrayType(GenericArrayType t) {}
void visitParameterizedType(ParameterizedType t) {}
void visitTypeVariable(TypeVariable<?> t) {}
void visitWildcardType(WildcardType t) {}
}

View file

@ -1,10 +1,10 @@
package org.xbib.event.clock; package org.xbib.event.clock;
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
import org.xbib.event.EventConsumer; import org.xbib.event.EventConsumer;
import java.util.logging.Logger; import java.util.logging.Logger;
import org.xbib.event.bus.AllowConcurrentEvents;
import org.xbib.event.bus.Subscribe;
public class ClockEventConsumer implements EventConsumer { public class ClockEventConsumer implements EventConsumer {

View file

@ -1,6 +1,6 @@
package org.xbib.event.clock; package org.xbib.event.clock;
import com.google.common.eventbus.EventBus; import org.xbib.event.bus.EventBus;
import org.xbib.settings.Settings; import org.xbib.settings.Settings;
import org.xbib.time.schedule.CronExpression; import org.xbib.time.schedule.CronExpression;
import org.xbib.time.schedule.CronSchedule; import org.xbib.time.schedule.CronSchedule;

View file

@ -1,11 +1,10 @@
package org.xbib.event.clock; package org.xbib.event.clock;
import com.google.common.eventbus.EventBus;
import java.time.Instant; import java.time.Instant;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import org.xbib.event.bus.EventBus;
public class ClockEventService implements Callable<Integer> { public class ClockEventService implements Callable<Integer> {

View file

@ -2,12 +2,6 @@ package org.xbib.event.queue;
import org.xbib.event.Event; import org.xbib.event.Event;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.FileTime;
import java.time.Instant;
import java.util.Map; import java.util.Map;
public class QueueEvent implements Event { public class QueueEvent implements Event {
@ -16,6 +10,9 @@ public class QueueEvent implements Event {
private Map<String, Object> map; private Map<String, Object> map;
public QueueEvent() {
}
@Override @Override
public void setKey(String key) { public void setKey(String key) {
this.key = key; this.key = key;

View file

@ -1,8 +1,8 @@
package org.xbib.event.queue.path.simple; package org.xbib.event.queue.path.simple;
import com.google.common.eventbus.EventBus;
import org.xbib.datastructures.api.TimeValue; import org.xbib.datastructures.api.TimeValue;
import org.xbib.datastructures.json.tiny.Json; import org.xbib.datastructures.json.tiny.Json;
import org.xbib.event.bus.EventBus;
import org.xbib.settings.Settings; import org.xbib.settings.Settings;
import java.io.IOException; import java.io.IOException;

View file

@ -1,6 +1,5 @@
package org.xbib.event.queue.path.simple; package org.xbib.event.queue.path.simple;
import com.google.common.eventbus.EventBus;
import org.xbib.datastructures.json.tiny.Json; import org.xbib.datastructures.json.tiny.Json;
import java.io.IOException; import java.io.IOException;
@ -13,6 +12,7 @@ import java.util.LinkedHashMap;
import java.util.Objects; import java.util.Objects;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import org.xbib.event.bus.EventBus;
import static org.xbib.event.queue.path.simple.PathQueueEvent.INCOMING; import static org.xbib.event.queue.path.simple.PathQueueEvent.INCOMING;

View file

@ -1,8 +1,8 @@
package org.xbib.event.queue.path.watch; package org.xbib.event.queue.path.watch;
import com.google.common.eventbus.EventBus;
import org.xbib.datastructures.api.TimeValue; import org.xbib.datastructures.api.TimeValue;
import org.xbib.datastructures.json.tiny.Json; import org.xbib.datastructures.json.tiny.Json;
import org.xbib.event.bus.EventBus;
import org.xbib.settings.Settings; import org.xbib.settings.Settings;
import java.io.Closeable; import java.io.Closeable;

View file

@ -1,6 +1,5 @@
package org.xbib.event.queue.path.watch; package org.xbib.event.queue.path.watch;
import com.google.common.eventbus.EventBus;
import org.xbib.datastructures.json.tiny.Json; import org.xbib.datastructures.json.tiny.Json;
import java.io.Closeable; import java.io.Closeable;
@ -20,6 +19,7 @@ import java.util.Objects;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
import org.xbib.event.bus.EventBus;
import static org.xbib.event.queue.path.watch.PathQueueEvent.INCOMING; import static org.xbib.event.queue.path.watch.PathQueueEvent.INCOMING;

View file

@ -0,0 +1,193 @@
package org.xbib.event.syslog;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
/**
* Syslog facility as defined in <a href="https://tools.ietf.org/html/rfc5424">RFC 5424 - The Syslog Protocol</a>.
* See <a href="http://tools.ietf.org/html/rfc5427">RFC 5427 - Textual Conventions for Syslog Management</a> for the {@link #label}.
*/
public enum Facility implements Comparable<Facility> {
/**
* Kernel messages, numerical code 0.
*/
KERN(0, "KERN"),
/**
* User-level messages, numerical code 1.
*/
USER(1, "USER"),
/**
* Mail system, numerical code 2.
*/
MAIL(2, "MAIL"),
/**
* System daemons, numerical code 3.
*/
DAEMON(3, "DAEMON"),
/**
* Security/authorization messages, numerical code 4.
*/
AUTH(4, "AUTH"),
/**
* Messages generated internally by syslogd, numerical code 5.
*/
SYSLOG(5, "SYSLOG"),
/**
* Line printer subsystem, numerical code 6.
*/
LPR(6, "LPR"),
/**
* Network news subsystem, numerical code 7.
*/
NEWS(7, "NEWS"),
/**
* UUCP subsystem, numerical code 8
*/
UUCP(8, "UUCP"),
/**
* Clock daemon, numerical code 9.
*/
CRON(9, "CRON"),
/**
* Security/authorization messages, numerical code 10.
*/
AUTHPRIV(10, "AUTHPRIV"),
/**
* FTP daemon, numerical code 11.
*/
FTP(11, "FTP"),
/**
* NTP subsystem, numerical code 12.
*/
NTP(12, "NTP"),
/**
* Log audit, numerical code 13.
*/
AUDIT(13, "AUDIT"),
/**
* Log alert, numerical code 14.
*/
ALERT(14, "ALERT"),
/**
* Clock daemon, numerical code 15.
*/
CLOCK(15, "CLOCK"),
/**
* Reserved for local use, numerical code 16.
*/
LOCAL0(16, "LOCAL0"),
/**
* Reserved for local use, numerical code 17.
*/
LOCAL1(17, "LOCAL1"),
/**
* Reserved for local use, numerical code 18.
*/
LOCAL2(18, "LOCAL2"),
/**
* Reserved for local use, numerical code 19.
*/
LOCAL3(19, "LOCAL3"),
/**
* Reserved for local use, numerical code 20.
*/
LOCAL4(20, "LOCAL4"),
/**
* Reserved for local use, numerical code 21.
*/
LOCAL5(21, "LOCAL5"),
/**
* Reserved for local use, numerical code 22.
*/
LOCAL6(22, "LOCAL6"),
/**
* Reserved for local use, numerical code 23.
*/
LOCAL7(23, "LOCAL7");
private final static Map<String, Facility> facilityFromLabel = new HashMap<String, Facility>();
private final static Map<Integer, Facility> facilityFromNumericalCode = new HashMap<Integer, Facility>();
static {
for (Facility facility : Facility.values()) {
facilityFromLabel.put(facility.label, facility);
facilityFromNumericalCode.put(facility.numericalCode, facility);
}
}
/**
* Syslog facility numerical code
*/
private final int numericalCode;
/**
* Syslog facility textual code. Not {@code null}
*/
private final String label;
private Facility(int numericalCode, String label) {
this.numericalCode = numericalCode;
this.label = label;
}
/**
* @param numericalCode Syslog facility numerical code
* @return Syslog facility, not {@code null}
* @throws IllegalArgumentException the given numericalCode is not a valid Syslog facility numerical code
*/
public static Facility fromNumericalCode(int numericalCode) throws IllegalArgumentException {
Facility facility = facilityFromNumericalCode.get(numericalCode);
if (facility == null) {
throw new IllegalArgumentException("Invalid facility '" + numericalCode + "'");
}
return facility;
}
/**
* @param label Syslog facility textual code. {@code null} or empty returns {@code null}
* @return Syslog facility, {@code null} if given value is {@code null}
* @throws IllegalArgumentException the given value is not a valid Syslog facility textual code
*/
public static Facility fromLabel(String label) throws IllegalArgumentException {
if (label == null || label.isEmpty()) {
return null;
}
Facility facility = facilityFromLabel.get(label);
if (facility == null) {
throw new IllegalArgumentException("Invalid facility '" + label + "'");
}
return facility;
}
/**
* Syslog facility numerical code
* @return numerical code
*/
public int numericalCode() {
return numericalCode;
}
/**
* Syslog facility textual code. Not {@code null}.
* @return label
*/
public String label() {
return label;
}
/**
* Compare on {@link Facility#numericalCode()}
* @return comparator for facilities
*/
public static Comparator<Facility> comparator() {
return new Comparator<Facility>() {
@Override
public int compare(Facility f1, Facility f2) {
return Integer.compare(f1.numericalCode, f2.numericalCode);
}
};
}
}

View file

@ -0,0 +1,346 @@
package org.xbib.event.syslog;
import java.io.IOException;
import java.io.Reader;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
class JsonParser {
private static final int DEFAULT_BUFFER_SIZE = 1024;
private final Reader reader;
private final char[] buf;
private int index;
private int fill;
private int ch;
private StringBuilder sb;
private int start;
public JsonParser(Reader reader) {
this(reader, DEFAULT_BUFFER_SIZE);
}
public JsonParser(Reader reader, int buffersize) {
this.reader = reader;
buf = new char[buffersize];
start = -1;
}
public Object parse() throws IOException {
read();
skipBlank();
Object result = parseValue();
skipBlank();
if (ch != -1) {
throw new IOException("unexpected character: " + ch);
}
return result;
}
private Object parseValue() throws IOException {
switch (ch) {
case 'n':
return parseNull();
case 't':
return parseTrue();
case 'f':
return parseFalse();
case '"':
return parseString();
case '[':
return parseList();
case '{':
return parseMap();
case '-':
case '+':
case '0':
case '1':
case '2':
case '3':
case '4':
case '5':
case '6':
case '7':
case '8':
case '9':
return parseNumber();
}
throw new IOException("value");
}
private List<Object> parseList() throws IOException {
read();
List<Object> list = new ArrayList<Object>();
skipBlank();
if (parseChar(']')) {
return list;
}
do {
skipBlank();
list.add(parseValue());
skipBlank();
} while (parseChar(','));
if (!parseChar(']')) {
expected("',' or ']'");
}
return list;
}
private Map<String, Object> parseMap() throws IOException {
read();
Map<String, Object> object = new LinkedHashMap<String, Object>();
skipBlank();
if (parseChar('}')) {
return object;
}
do {
skipBlank();
if (ch != '"') {
expected("name");
}
String name = parseString();
skipBlank();
if (!parseChar(':')) {
expected("':'");
}
skipBlank();
object.put(name, parseValue());
skipBlank();
} while (parseChar(','));
if (!parseChar('}')) {
expected("',' or '}'");
}
return object;
}
private Object parseNull() throws IOException {
read();
checkForChar('u');
checkForChar('l');
checkForChar('l');
return null;
}
private Object parseTrue() throws IOException {
read();
checkForChar('r');
checkForChar('u');
checkForChar('e');
return Boolean.TRUE;
}
private Object parseFalse() throws IOException {
read();
checkForChar('a');
checkForChar('l');
checkForChar('s');
checkForChar('e');
return Boolean.FALSE;
}
private void checkForChar(char ch) throws IOException {
if (!parseChar(ch)) {
expected("'" + ch + "'");
}
}
private String parseString() throws IOException {
read();
startCapture();
while (ch != '"') {
if (ch == '\\') {
pauseCapture();
parseEscaped();
startCapture();
} else if (ch < 0x20) {
expected("valid string character");
} else {
read();
}
}
String s = endCapture();
read();
return s;
}
private void parseEscaped() throws IOException {
read();
switch (ch) {
case '"':
case '/':
case '\\':
sb.append((char) ch);
break;
case 'b':
sb.append('\b');
break;
case 't':
sb.append('\t');
break;
case 'f':
sb.append('\f');
break;
case 'n':
sb.append('\n');
break;
case 'r':
sb.append('\r');
break;
case 'u':
char[] hex = new char[4];
for (int i = 0; i < 4; i++) {
read();
if (!isHexDigit()) {
expected("hexadecimal digit");
}
hex[i] = (char) ch;
}
sb.append((char) Integer.parseInt(String.valueOf(hex), 16));
break;
default:
expected("valid escape sequence");
}
read();
}
private Object parseNumber() throws IOException {
startCapture();
parseChar('-');
int firstDigit = ch;
if (!parseDigit()) {
expected("digit");
}
if (firstDigit != '0') {
while (parseDigit()) {
}
}
parseFraction();
parseExponent();
return endCapture();
}
private boolean parseFraction() throws IOException {
if (!parseChar('.')) {
return false;
}
if (!parseDigit()) {
expected("digit");
}
while (parseDigit()) {
}
return true;
}
private boolean parseExponent() throws IOException {
if (!parseChar('e') && !parseChar('E')) {
return false;
}
if (!parseChar('+')) {
parseChar('-');
}
if (!parseDigit()) {
expected("digit");
}
while (parseDigit()) {
}
return true;
}
private boolean parseChar(char ch) throws IOException {
if (this.ch != ch) {
return false;
}
read();
return true;
}
private boolean parseDigit() throws IOException {
if (!isDigit()) {
return false;
}
read();
return true;
}
private void skipBlank() throws IOException {
while (isWhiteSpace()) {
read();
}
}
private void read() throws IOException {
if (ch == -1) {
throw new IOException("unexpected end of input");
}
if (index == fill) {
if (start != -1) {
sb.append(buf, start, fill - start);
start = 0;
}
fill = reader.read(buf, 0, buf.length);
index = 0;
if (fill == -1) {
ch = -1;
return;
}
}
ch = buf[index++];
}
private void startCapture() {
if (sb == null) {
sb = new StringBuilder();
}
start = index - 1;
}
private void pauseCapture() {
int end = ch == -1 ? index : index - 1;
sb.append(buf, start, end - start);
start = -1;
}
private String endCapture() {
int end = ch == -1 ? index : index - 1;
String captured;
if (sb.length() > 0) {
sb.append(buf, start, end - start);
captured = sb.toString();
sb.setLength(0);
} else {
captured = new String(buf, start, end - start);
}
start = -1;
return captured;
}
private boolean isWhiteSpace() {
return ch == ' ' || ch == '\t' || ch == '\n' || ch == '\r';
}
private boolean isDigit() {
return ch >= '0' && ch <= '9';
}
private boolean isHexDigit() {
return ch >= '0' && ch <= '9'
|| ch >= 'a' && ch <= 'f'
|| ch >= 'A' && ch <= 'F';
}
private void expected(String expected) throws IOException {
if (ch == -1) {
throw new IOException("unexpected end of input");
}
throw new IOException("expected " + expected);
}
}

View file

@ -0,0 +1,231 @@
package org.xbib.event.syslog;
import java.io.IOException;
import java.io.StringReader;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalAccessor;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.xbib.datastructures.api.Builder;
/**
* Parses a syslog message with RFC 3164 or RFC 5424 date format
*/
public class MessageParser {
private final static Pattern TWO_SPACES = Pattern.compile(" ");
private final static DateTimeFormatter formatter = DateTimeFormatter.ISO_DATE_TIME;
private final static DateTimeFormatter rfc3164Format =
DateTimeFormatter.ofPattern("MMM d HH:mm:ss").withZone(ZoneId.of("UTC"));
private final static int RFC3164_LEN = 15;
private final static int RFC5424_PREFIX_LEN = 19;
private final DateTimeFormatter timeParser;
private final Map<String, LocalDateTime> timestampCache;
private final Map<String, String> fieldNames = new HashMap<>() {{
put("host", "host");
put("facility", "facility");
put("severity", "severity");
put("timestamp", "timestamp");
put("message", "message");
}};
private Map<String, Pattern> patterns;
public MessageParser() {
timeParser = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss").withZone(ZoneId.of("UTC"));
timestampCache = new LinkedHashMap<>();
/*CacheBuilder.newBuilder().maximumSize(1000).build(
new CacheLoader<>() {
@Override
public LocalDateTime load(String key) {
return LocalDateTime.parse(key, timeParser);
}
});*/
}
public MessageParser setPatterns(Map<String, Pattern> patterns) {
this.patterns = patterns;
return this;
}
public MessageParser setFieldName(String name, String newName) {
fieldNames.put(name, newName);
return this;
}
@SuppressWarnings("unchecked")
public void parseMessage(String msg, Builder builder) throws IOException {
int msgLen = msg.length();
int pos = 0;
if (msg.charAt(pos) != '<') {
throw new IllegalArgumentException("bad format: invalid priority: cannot find open bracket '<' " + msg);
}
int end = msg.indexOf('>');
if (end < 0 || end > 6) {
throw new IllegalArgumentException("bad format: invalid priority: cannot find end bracket '>' " + msg);
}
int pri = Integer.parseInt(msg.substring(1, end));
Facility facility = Facility.fromNumericalCode(pri / 8);
Severity severity = Severity.fromNumericalCode(pri % 8);
builder.field(fieldNames.get("facility"), facility.label())
.field(fieldNames.get("severity"), severity.label());
if (msgLen <= end + 1) {
throw new IllegalArgumentException("bad format: no data except priority " + msg);
}
pos = end + 1;
if (msgLen > pos + 2 && "1 ".equals(msg.substring(pos, pos + 2))) {
pos += 2;
}
TemporalAccessor timestamp;
char ch = msg.charAt(pos);
if (ch == '-') {
timestamp = LocalDateTime.now();
if (msgLen <= pos + 2) {
throw new IllegalArgumentException("bad syslog format (missing hostname)");
}
pos += 2;
} else if (ch >= 'A' && ch <= 'Z') {
if (msgLen <= pos + RFC3164_LEN) {
throw new IllegalArgumentException("bad timestamp format");
}
timestamp = parseRFC3164Time(msg.substring(pos, pos + RFC3164_LEN));
pos += RFC3164_LEN + 1;
} else {
int sp = msg.indexOf(' ', pos);
if (sp == -1) {
throw new IllegalArgumentException("bad timestamp format");
}
timestamp = parseRFC5424Date(msg.substring(pos, sp));
pos = sp + 1;
}
builder.field(fieldNames.get("timestamp"), formatter.format(timestamp));
int ns = msg.indexOf(' ', pos);
if (ns == -1) {
throw new IllegalArgumentException("bad syslog format (missing hostname)");
}
String hostname = msg.substring(pos, ns);
builder.field(fieldNames.get("host"), hostname);
String data;
if (msgLen > ns + 1) {
pos = ns + 1;
data = msg.substring(pos);
} else {
data = msg;
}
try {
if (data.startsWith("@cee:")) {
data = data.substring(5);
}
JsonParser parser = new JsonParser(new StringReader(data));
Map<String,Object> map = (Map<String,Object>)parser.parse();
builder.buildMap(map);
} catch (Throwable t) {
// ignore
}
String message = fieldNames.get("message");
builder.field(message, data);
if (patterns != null) {
for (Map.Entry<String, Pattern> entry : patterns.entrySet()) {
Matcher m = entry.getValue().matcher(data);
if (m.find()) {
builder.field(entry.getKey(), m.group(1));
}
}
}
}
private LocalDateTime parseRFC5424Date(String msg) {
int len = msg.length();
if (len <= RFC5424_PREFIX_LEN) {
throw new IllegalArgumentException("bad format: not a valid RFC5424 timestamp: " + msg);
}
String timestampPrefix = msg.substring(0, RFC5424_PREFIX_LEN);
LocalDateTime timestamp = timestampCache.get(timestampPrefix);
int pos = RFC5424_PREFIX_LEN;
if (timestamp == null) {
throw new IllegalArgumentException("parse error: timestamp is null");
}
if (msg.charAt(pos) == '.') {
boolean found = false;
int end = pos + 1;
if (len <= end) {
throw new IllegalArgumentException("bad timestamp format (no TZ)");
}
while (!found) {
char ch = msg.charAt(end);
if (ch >= '0' && ch <= '9') {
end++;
} else {
found = true;
}
}
if (end - (pos + 1) > 0) {
long milliseconds = (long) (Double.parseDouble(msg.substring(pos, end)) * 1000.0);
timestamp.plus(milliseconds, ChronoUnit.MILLIS);
} else {
throw new IllegalArgumentException("bad format: invalid timestamp (fractional portion): " + msg);
}
pos = end;
}
char ch = msg.charAt(pos);
if (ch != 'Z') {
if (ch == '+' || ch == '-') {
if (len <= pos + 5) {
throw new IllegalArgumentException("bad format: invalid timezone: " + msg);
}
int sign = ch == '+' ? +1 : -1;
char[] hourzone = new char[5];
for (int i = 0; i < 5; i++) {
hourzone[i] = msg.charAt(pos + 1 + i);
}
if (hourzone[0] >= '0' && hourzone[0] <= '9'
&& hourzone[1] >= '0' && hourzone[1] <= '9'
&& hourzone[2] == ':'
&& hourzone[3] >= '0' && hourzone[3] <= '9'
&& hourzone[4] >= '0' && hourzone[4] <= '9') {
int hourOffset = Integer.parseInt(msg.substring(pos + 1, pos + 3));
int minOffset = Integer.parseInt(msg.substring(pos + 4, pos + 6));
timestamp.minus(sign * ((hourOffset * 60L) + minOffset) * 60000, ChronoUnit.MILLIS);
} else {
throw new IllegalArgumentException("bad format: invalid timezone: " + msg);
}
}
}
return timestamp;
}
private LocalDateTime parseRFC3164Time(String timestamp) {
LocalDateTime now = LocalDateTime.now();
int year = now.getYear();
timestamp = TWO_SPACES.matcher(timestamp).replaceFirst(" ");
LocalDateTime date;
try {
date = LocalDateTime.parse(timestamp, rfc3164Format);
} catch (Exception e) {
return LocalDateTime.MIN;
}
LocalDateTime fixed = date.withYear(year);
if (fixed.isAfter(now) && fixed.minusMonths(1).isAfter(now)) {
fixed = date.withYear(year - 1);
} else if (fixed.isBefore(now) && fixed.plusMonths(1).isBefore(now)) {
fixed = date.withYear(year + 1);
}
return fixed;
}
}

View file

@ -0,0 +1,123 @@
package org.xbib.event.syslog;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
/**
* Syslog severity as defined in <a href="https://tools.ietf.org/html/rfc5424">RFC 5424 - The Syslog Protocol</a>.
*/
public enum Severity {
/**
* Emergency: system is unusable, numerical code 0.
*/
EMERGENCY(0, "EMERGENCY"),
/**
* Alert: action must be taken immediately, numerical code 1.
*/
ALERT(1, "ALERT"),
/**
* Critical: critical conditions, numerical code 2.
*/
CRITICAL(2, "CRITICAL"),
/**
* Error: error conditions, numerical code 3.
*/
ERROR(3, "ERROR"),
/**
* Warning: warning conditions, numerical code 4.
*/
WARNING(4, "WARNING"),
/**
* Notice: normal but significant condition, numerical code 5.
*/
NOTICE(5, "NOTICE"),
/**
* Informational: informational messages, numerical code 6.
*/
INFORMATIONAL(6, "INFORMATIONAL"),
/**
* Debug: debug-level messages, numerical code 7.
*/
DEBUG(7, "DEBUG");
private final static Map<String, Severity> severityFromLabel = new HashMap<String, Severity>();
private final static Map<Integer, Severity> severityFromNumericalCode = new HashMap<Integer, Severity>();
static {
for (Severity severity : Severity.values()) {
severityFromLabel.put(severity.label, severity);
severityFromNumericalCode.put(severity.numericalCode, severity);
}
}
private final int numericalCode;
private final String label;
private Severity(int numericalCode, String label) {
this.numericalCode = numericalCode;
this.label = label;
}
/**
* @param numericalCode Syslog severity numerical code
* @return Syslog severity, not {@code null}
* @throws IllegalArgumentException the given numericalCode is not a valid Syslog severity numerical code
*/
public static Severity fromNumericalCode(int numericalCode) throws IllegalArgumentException {
Severity severity = severityFromNumericalCode.get(numericalCode);
if (severity == null) {
throw new IllegalArgumentException("Invalid severity '" + numericalCode + "'");
}
return severity;
}
/**
* @param label Syslog severity textual code. {@code null} or empty returns {@code null}
* @return Syslog severity, {@code null} if given value is {@code null}
* @throws IllegalArgumentException the given value is not a valid Syslog severity textual code
*/
public static Severity fromLabel(String label) throws IllegalArgumentException {
if (label == null || label.isEmpty()) {
return null;
}
Severity severity = severityFromLabel.get(label);
if (severity == null) {
throw new IllegalArgumentException("Invalid severity '" + label + "'");
}
return severity;
}
/**
* Syslog severity numerical code
* @return numerical code
*/
public int numericalCode() {
return numericalCode;
}
/**
* Syslog severity textual code. Not {@code null}.
* @return the severity label
*/
public String label() {
return label;
}
/**
* Compare on {@link Severity#numericalCode()}
* @return comparator for severities
*/
public static Comparator<Severity> comparator() {
return new Comparator<Severity>() {
@Override
public int compare(Severity s1, Severity s2) {
return Integer.compare(s1.numericalCode, s2.numericalCode);
}
};
}
}

View file

@ -0,0 +1,249 @@
package org.xbib.event.syslog;
import java.io.IOException;
import java.net.BindException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.format.DateTimeFormatter;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Logger;
import java.util.regex.Pattern;
import org.xbib.datastructures.api.Builder;
import org.xbib.datastructures.api.ByteSizeUnit;
import org.xbib.datastructures.api.ByteSizeValue;
import org.xbib.datastructures.api.TimeValue;
import org.xbib.settings.Settings;
public class SyslogService {
/*
private static final Logger logger = Logger.getLogger(SyslogService.class.getName());
private final static String SYSLOG_HOST = "syslog.host";
private final static String SYSLOG_PORT = "syslog.port";
private final static String SYSLOG_RECEIVE_BUFFER_SIZE = "receive_buffer_size";
private final static String SYSLOG_PATTERNS = "patterns";
private final static String SYSLOG_FIELD_NAMES = "field_names";
private final String host;
private final String port;
private final ByteSizeValue receiveBufferSize;
private final ReceiveBufferSizePredictorFactory receiveBufferSizePredictorFactory;
private final MessageParser messageParser;
private DateTimeFormatter formatter;
private ConnectionlessBootstrap udpBootstrap;
private ServerBootstrap tcpBootstrap;
private Channel udpChannel;
private Channel tcpChannel;
public SyslogService(Settings settings) {
this.host = settings.get(SYSLOG_HOST, "127.0.0.1");
this.port = settings.get(SYSLOG_PORT, "9500-9600");
this.receiveBufferSize = settings.getAsBytesSize(SYSLOG_RECEIVE_BUFFER_SIZE, new ByteSizeValue(10, ByteSizeUnit.MB));
this.receiveBufferSizePredictorFactory = new FixedReceiveBufferSizePredictorFactory(receiveBufferSize.bytesAsInt());
Map<String, Object> map = (Map<String, Object>) settings.getAsStructuredMap().get(SYSLOG_PATTERNS);
Map<String, Pattern> patterns = new HashMap<>();
if (map != null) {
for (String key : map.keySet()) {
patterns.put(key, Pattern.compile((String) map.get(key)));
}
}
this.messageParser = new MessageParser().setPatterns(patterns);
map = (Map<String, Object>) settings.getAsStructuredMap().get(SYSLOG_FIELD_NAMES);
if (map != null) {
for (String key : map.keySet()) {
messageParser.setFieldName(key, (String) map.get(key));
}
}
logger.info("syslog server: host [" + host + "], port [" + port + "]");
}
protected void doStart() throws Exception {
initializeUDP();
initializeTCP();
logger.info("syslog server up");
}
protected void doStop() throws ElasticsearchException {
if (udpChannel != null) {
udpChannel.close().awaitUninterruptibly();
}
if (udpBootstrap != null) {
udpBootstrap.releaseExternalResources();
}
if (tcpChannel != null) {
tcpChannel.close().awaitUninterruptibly();
}
if (tcpBootstrap != null) {
tcpBootstrap.releaseExternalResources();
}
bulkProcessor.close();
logger.info("syslog server down");
}
private void initializeUDP() {
udpBootstrap = new ConnectionlessBootstrap(new NioDatagramChannelFactory(
Executors.newCachedThreadPool(), 4));
udpBootstrap.setOption("receiveBufferSize", receiveBufferSize.bytesAsInt());
udpBootstrap.setOption("receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory);
udpBootstrap.setOption("broadcast", "false");
udpBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new Handler("udp"));
}
});
InetAddress address;
try {
address = NetworkUtils.resolveInetAddress(host, null);
} catch (IOException e) {
logger.warn("failed to resolve host {}", e, host);
return;
}
final InetAddress hostAddress = address;
PortsRange portsRange = new PortsRange(port);
final AtomicReference<Exception> lastException = new AtomicReference<>();
boolean success = portsRange.iterate(new PortsRange.PortCallback() {
@Override
public boolean onPortNumber(int portNumber) {
try {
udpChannel = udpBootstrap.bind(new InetSocketAddress(hostAddress, portNumber));
} catch (Exception e) {
lastException.set(e);
return false;
}
return true;
}
});
if (!success) {
logger.warn("failed to bind to {}/{}", lastException.get(), hostAddress, port);
return;
}
logger.info("UDP listener running, address {}", udpChannel.getLocalAddress());
}
private void initializeTCP() {
tcpBootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(), Executors.newCachedThreadPool(),
settings.getAsInt("tcp.worker", 4)));
tcpBootstrap.setOption("receiveBufferSize", receiveBufferSize.bytesAsInt());
tcpBootstrap.setOption("receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory);
tcpBootstrap.setOption("reuseAddress", settings.getAsBoolean("tcp.reuse_address", true));
tcpBootstrap.setOption("tcpNoDelay", settings.getAsBoolean("tcp.no_delay", true));
tcpBootstrap.setOption("keepAlive", settings.getAsBoolean("tcp.keep_alive", true));
tcpBootstrap.setOption("child.receiveBufferSize", receiveBufferSize.bytesAsInt());
tcpBootstrap.setOption("child.receiveBufferSizePredictorFactory", receiveBufferSizePredictorFactory);
tcpBootstrap.setOption("child.reuseAddress", settings.getAsBoolean("tcp.reuse_address", true));
tcpBootstrap.setOption("child.tcpNoDelay", settings.getAsBoolean("tcp.no_delay", true));
tcpBootstrap.setOption("child.keepAlive", settings.getAsBoolean("tcp.keep_alive", true));
tcpBootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new Handler("tcp"));
}
});
InetAddress address;
try {
address = SyslogNetworkUtils.resolveInetAddress(host, null);
} catch (IOException e) {
logger.warn("failed to resolve host {}", e, host);
return;
}
final InetAddress hostAddress = address;
PortsRange portsRange = new PortsRange(port);
final AtomicReference<Exception> lastException = new AtomicReference<>();
boolean success = portsRange.iterate(new PortsRange.PortCallback() {
@Override
public boolean onPortNumber(int portNumber) {
try {
tcpChannel = tcpBootstrap.bind(new InetSocketAddress(hostAddress, portNumber));
} catch (Exception e) {
lastException.set(e);
return false;
}
return true;
}
});
if (!success) {
logger.warn("failed to bind to {}/{}", lastException.get(), hostAddress, port);
return;
}
logger.info("TCP listener running, address {}", tcpChannel.getLocalAddress());
}
class Handler extends SimpleChannelUpstreamHandler {
private final String protocol;
Handler(String protocol) {
this.protocol = protocol;
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
ChannelBuffer buffer = (ChannelBuffer) e.getMessage();
XContentBuilder builder = jsonBuilder();
parse(ctx, buffer, builder);
IndexRequest indexRequest = new IndexRequest(isTimeWindow ? formatter.print(new DateTime()) : index)
.type(type)
.opType(IndexRequest.OpType.INDEX)
.source(builder);
try {
bulkProcessor.add(indexRequest);
} catch (Exception e1) {
logger.warn("failed to execute bulk request", e1);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
if (e.getCause() instanceof BindException) {
// ignore, this happens when we retry binding to several ports, its fine if we fail...
return;
}
logger.warn("failure caught", e.getCause());
throw new IOException(e.getCause());
}
private void parse(ChannelHandlerContext ctx, ChannelBuffer buffer, Builder builder) throws IOException {
SocketAddress localAddress = ctx.getChannel().getLocalAddress();
SocketAddress remoteAddress = ctx.getChannel().getRemoteAddress();
ChannelBufferBytesReference ref = new ChannelBufferBytesReference(buffer);
try {
builder.startObject();
builder.field("protocol", protocol);
if (localAddress != null) {
builder.field("local", localAddress.toString());
}
if (remoteAddress != null) {
builder.field("remote", remoteAddress.toString());
}
messageParser.parseMessage(ref.toUtf8(), builder);
builder.endObject();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}
*/
}

View file

@ -1,6 +1,6 @@
package org.xbib.event.timer; package org.xbib.event.timer;
import com.google.common.eventbus.EventBus; import org.xbib.event.bus.EventBus;
import org.xbib.event.persistence.FilePersistenceStore; import org.xbib.event.persistence.FilePersistenceStore;
import org.xbib.event.persistence.PersistenceStore; import org.xbib.event.persistence.PersistenceStore;
import org.xbib.settings.Settings; import org.xbib.settings.Settings;

View file

@ -1,6 +1,6 @@
package org.xbib.event.timer; package org.xbib.event.timer;
import com.google.common.eventbus.EventBus; import org.xbib.event.bus.EventBus;
import org.xbib.event.persistence.PersistenceStore; import org.xbib.event.persistence.PersistenceStore;
import java.io.Closeable; import java.io.Closeable;

View file

@ -23,6 +23,9 @@ import java.util.function.Predicate;
*/ */
public abstract class AsyncQuery<T> implements AsyncTraverser<T> { public abstract class AsyncQuery<T> implements AsyncTraverser<T> {
public AsyncQuery() {
}
/** /**
* Returns an asynchronous sequential ordered query whose elements * Returns an asynchronous sequential ordered query whose elements
* are the specified values in data parameter. * are the specified values in data parameter.

View file

@ -0,0 +1,62 @@
package org.xbib.event.bus;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class AsyncEventBusTest {
private static final String EVENT = "Hello";
private FakeExecutor executor;
private AsyncEventBus bus;
@BeforeEach
protected void setUp() {
executor = new FakeExecutor();
bus = new AsyncEventBus(executor);
}
@Test
public void testBasicDistribution() {
StringCatcher catcher = new StringCatcher();
bus.register(catcher);
// We post the event, but our Executor will not deliver it until instructed.
bus.post(EVENT);
List<String> events = catcher.getEvents();
assertTrue(events.isEmpty(), "No events should be delivered synchronously.");
// Now we find the task in our Executor and explicitly activate it.
List<Runnable> tasks = executor.getTasks();
assertEquals(1, tasks.size(), "One event dispatch task should be queued.");
tasks.get(0).run();
assertEquals(1, events.size(), "One event should be delivered.");
assertEquals(EVENT, events.get(0), "Correct string should be delivered.");
}
/**
* An {@link Executor} wanna-be that simply records the tasks it's given. Arguably the Worst
* Executor Ever.
*/
public static class FakeExecutor implements Executor {
List<Runnable> tasks = new ArrayList<>();
@Override
public void execute(Runnable task) {
tasks.add(task);
}
public List<Runnable> getTasks() {
return tasks;
}
}
}

View file

@ -0,0 +1,308 @@
package org.xbib.event.bus;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.fail;
public class EventBusTest {
private static final Logger logger = Logger.getLogger(EventBusTest.class.getName());
private static final String EVENT = "Hello";
private static final String BUS_IDENTIFIER = "test-bus";
private EventBus bus;
@BeforeEach
public void create() {
bus = new EventBus(BUS_IDENTIFIER);
}
@Test
public void testPolymorphicDistribution() {
// Three catchers for related types String, Object, and Comparable<?>.
// String isa Object
// String isa Comparable<?>
// Comparable<?> isa Object
StringCatcher stringCatcher = new StringCatcher();
final List<Object> objectEvents = new ArrayList<>();
Object objCatcher =
new Object() {
@SuppressWarnings("unused")
@Subscribe
public void eat(Object food) {
objectEvents.add(food);
}
};
final List<Comparable<?>> compEvents = new ArrayList<>();
Object compCatcher =
new Object() {
@SuppressWarnings("unused")
@Subscribe
public void eat(Comparable<?> food) {
compEvents.add(food);
}
};
bus.register(stringCatcher);
bus.register(objCatcher);
bus.register(compCatcher);
// Two additional event types: Object and Comparable<?> (played by Integer)
Object objEvent = new Object();
Object compEvent = 6;
bus.post(EVENT);
bus.post(objEvent);
bus.post(compEvent);
List<String> stringEvents = stringCatcher.getEvents();
logger.log(Level.INFO, stringEvents.toString());
assertEquals(1, stringEvents.size(), "Only one String should be delivered.");
assertEquals(EVENT, stringEvents.get(0), "Correct string should be delivered.");
// Check the Catcher<Object>...
assertEquals(3, objectEvents.size(), "Three Objects should be delivered.");
assertEquals(EVENT, objectEvents.get(0), "String fixture must be first object delivered.");
assertEquals(objEvent, objectEvents.get(1), "Object fixture must be second object delivered.");
assertEquals(compEvent, objectEvents.get(2), "Comparable fixture must be thirdobject delivered.");
// Check the Catcher<Comparable<?>>...
assertEquals(2, compEvents.size(), "Two Comparable<?>s should be delivered.");
assertEquals(EVENT, compEvents.get(0), "String fixture must be first comparable delivered.");
assertEquals(compEvent, compEvents.get(1), "Comparable fixture must be second comparable delivered.");
}
@Test
public void testSubscriberThrowsException() throws Exception {
final RecordingSubscriberExceptionHandler handler = new RecordingSubscriberExceptionHandler();
final EventBus eventBus = new EventBus(handler);
final RuntimeException exception =
new RuntimeException("but culottes have a tendancy to ride up!");
final Object subscriber =
new Object() {
@Subscribe
public void throwExceptionOn(String message) {
throw exception;
}
};
eventBus.register(subscriber);
eventBus.post(EVENT);
assertNotNull(handler.getContext());
assertEquals(exception, handler.getException(), "Cause should be available.");
assertEquals(eventBus, handler.getContext().getEventBus(), "EventBus should be available.");
assertEquals(EVENT, handler.getContext().getEvent(), "Event should be available.");
assertEquals(subscriber, handler.getContext().getSubscriber(), "Subscriber should be available.");
assertEquals(subscriber.getClass().getMethod("throwExceptionOn", String.class),
handler.getContext().getSubscriberMethod(),"Method should be available.");
}
static class RecordingSubscriberExceptionHandler implements SubscriberExceptionHandler {
private SubscriberExceptionContext context;
private Throwable exception;
public RecordingSubscriberExceptionHandler() {
}
@Override
public void handleException(Throwable exception, SubscriberExceptionContext context) {
this.exception = exception;
this.context = context;
}
public SubscriberExceptionContext getContext() {
return context;
}
public Throwable getException() {
return exception;
}
}
@Test
public void testSubscriberThrowsExceptionHandlerThrowsException() throws Exception {
final EventBus eventBus = new EventBus((exception, context) -> {
throw new RuntimeException();
});
final Object subscriber =
new Object() {
@Subscribe
public void throwExceptionOn(String message) {
throw new RuntimeException();
}
};
eventBus.register(subscriber);
try {
eventBus.post(EVENT);
} catch (RuntimeException e) {
fail("Exception should not be thrown.");
}
}
@Test
public void testDeadEventForwarding() {
GhostCatcher catcher = new GhostCatcher();
bus.register(catcher);
bus.post(EVENT);
List<DeadEvent> events = catcher.getEvents();
assertEquals(1, events.size(), "One dead event should be delivered.");
assertEquals(EVENT, events.get(0).getEvent(), "The dead event should wrap the original event.");
}
static class GhostCatcher {
private final List<DeadEvent> events = new ArrayList<>();
@Subscribe
public void ohNoesIHaveDied(DeadEvent event) {
events.add(event);
}
public List<DeadEvent> getEvents() {
return events;
}
}
@Test
public void testMissingSubscribe() {
bus.register(new Object());
}
@Test
public void testUnregister() {
StringCatcher catcher1 = new StringCatcher();
StringCatcher catcher2 = new StringCatcher();
try {
bus.unregister(catcher1);
fail("Attempting to unregister an unregistered object succeeded");
} catch (IllegalArgumentException expected) {
// OK.
}
bus.register(catcher1);
bus.post(EVENT);
bus.register(catcher2);
bus.post(EVENT);
List<String> expectedEvents = new ArrayList<>();
expectedEvents.add(EVENT);
expectedEvents.add(EVENT);
assertEquals(expectedEvents, catcher1.getEvents(), "Two correct events should be delivered.");
assertEquals(List.of(EVENT), catcher2.getEvents(), "One correct event should be delivered.");
bus.unregister(catcher1);
bus.post(EVENT);
assertEquals(expectedEvents, catcher1.getEvents(), "Shouldn't catch any more events when unregistered.");
assertEquals(expectedEvents, catcher2.getEvents(), "Two correct events should be delivered.");
try {
bus.unregister(catcher1);
fail("Attempting to unregister an unregistered object succeeded");
} catch (IllegalArgumentException expected) {
// OK.
}
bus.unregister(catcher2);
bus.post(EVENT);
assertEquals(expectedEvents, catcher1.getEvents(), "Shouldn't catch any more events when unregistered.");
assertEquals(expectedEvents, catcher2.getEvents(), "Shouldn't catch any more events when unregistered.");
}
// NOTE: This test will always pass if register() is thread-safe but may also
// pass if it isn't, though this is unlikely.
@Test
public void testRegisterThreadSafety() throws Exception {
List<StringCatcher> catchers = new CopyOnWriteArrayList<>();
List<Future<?>> futures = new ArrayList<>();
ExecutorService executor = Executors.newFixedThreadPool(10);
int numberOfCatchers = 10000;
for (int i = 0; i < numberOfCatchers; i++) {
futures.add(executor.submit(new Registrator(bus, catchers)));
}
for (int i = 0; i < numberOfCatchers; i++) {
futures.get(i).get();
}
assertEquals(numberOfCatchers, catchers.size(), "Unexpected number of catchers in the list");
bus.post(EVENT);
List<String> expectedEvents = List.of(EVENT);
for (StringCatcher catcher : catchers) {
assertEquals(expectedEvents, catcher.getEvents(), "One of the registered catchers did not receive an event.");
}
}
/** Runnable which registers a StringCatcher on an event bus and adds it to a list. */
private static class Registrator implements Runnable {
private final EventBus bus;
private final List<StringCatcher> catchers;
Registrator(EventBus bus, List<StringCatcher> catchers) {
this.bus = bus;
this.catchers = catchers;
}
@Override
public void run() {
StringCatcher catcher = new StringCatcher();
bus.register(catcher);
catchers.add(catcher);
}
}
@Test
public void testToString() {
EventBus eventBus = new EventBus("a b ; - \" < > / \\");
assertEquals("EventBus{a b ; - \" < > / \\ €}", eventBus.toString());
}
/**
* Tests that bridge methods are not subscribed to events. In Java 8, annotations are included on
* the bridge method in addition to the original method, which causes both the original and bridge
* methods to be subscribed (since both are annotated @Subscribe) without specifically checking
* for bridge methods.
*/
@Test
public void testRegistrationWithBridgeMethod() {
final AtomicInteger calls = new AtomicInteger();
bus.register(
new Callback<String>() {
@Subscribe
@Override
public void call(String s) {
calls.incrementAndGet();
}
});
bus.post("hello");
assertEquals(1, calls.get());
}
private interface Callback<T> {
void call(T t);
}
@Test
public void testPrimitiveSubscribeFails() {
class SubscribesToPrimitive {
@Subscribe
public void toInt(int i) {}
}
try {
bus.register(new SubscribesToPrimitive());
fail("should have thrown");
} catch (IllegalArgumentException expected) {
}
}
}

View file

@ -0,0 +1,74 @@
package org.xbib.event.bus;
import java.util.ArrayList;
import java.util.List;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* Validate that {@link EventBus} behaves carefully when listeners publish their own events.
*/
public class ReentrantEventsTest {
static final String FIRST = "one";
static final Double SECOND = 2.0d;
final EventBus bus = new EventBus();
@Test
public void testNoReentrantEvents() {
ReentrantEventsHater hater = new ReentrantEventsHater();
bus.register(hater);
bus.post(FIRST);
assertEquals(List.of(FIRST, SECOND), hater.eventsReceived, "ReentrantEventHater expected 2 events");
}
public class ReentrantEventsHater {
boolean ready = true;
List<Object> eventsReceived = new ArrayList<>();
@Subscribe
public void listenForStrings(String event) {
eventsReceived.add(event);
ready = false;
try {
bus.post(SECOND);
} finally {
ready = true;
}
}
@Subscribe
public void listenForDoubles(Double event) {
assertTrue(ready, "I received an event when I wasn't ready!");
eventsReceived.add(event);
}
}
@Test
public void testEventOrderingIsPredictable() {
EventProcessor processor = new EventProcessor();
bus.register(processor);
EventRecorder recorder = new EventRecorder();
bus.register(recorder);
bus.post(FIRST);
assertEquals(List.of(FIRST, SECOND), recorder.eventsReceived, "EventRecorder expected events in order");
}
class EventProcessor {
@Subscribe
public void listenForStrings(String event) {
bus.post(SECOND);
}
}
static class EventRecorder {
List<Object> eventsReceived = new ArrayList<>();
@Subscribe
public void listenForEverything(Object event) {
eventsReceived.add(event);
}
}
}

View file

@ -0,0 +1,30 @@
package org.xbib.event.bus;
import java.util.ArrayList;
import java.util.List;
import static org.junit.jupiter.api.Assertions.fail;
/**
* A simple EventSubscriber mock that records Strings.
*
* <p>For testing fun, also includes a landmine method that EventBus tests are required <em>not</em>
* to call ({@link #methodWithoutAnnotation(String)}).
*
*/
public class StringCatcher {
private final List<String> events = new ArrayList<>();
@Subscribe
public void hereHaveAString(String string) {
events.add(string);
}
public void methodWithoutAnnotation(String string) {
fail("Event bus must not call methods without @Subscribe!");
}
public List<String> getEvents() {
return events;
}
}