diff --git a/gradle.properties b/gradle.properties index 109b9f9..55ac70f 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,5 +1,5 @@ group = org.xbib name = event -version = 0.0.7 +version = 0.0.8 org.gradle.warning.mode = ALL diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 7f93135..d64cd49 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 8838ba9..e6aba25 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-8.4-all.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.5-all.zip networkTimeout=10000 validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME diff --git a/settings.gradle b/settings.gradle index cddbd06..4c03b79 100644 --- a/settings.gradle +++ b/settings.gradle @@ -15,11 +15,10 @@ pluginManagement { dependencyResolutionManagement { versionCatalogs { libs { - version('gradle', '8.4') - version('groovy', '4.0.13') + version('gradle', '8.5') version('datastructures', '5.0.5') - version('netty', '4.1.100.Final') - version('net', '4.0.0') + version('netty', '4.1.101.Final') + version('net', '4.0.2') library('netty-handler', 'io.netty', 'netty-handler').versionRef('netty') library('net', 'org.xbib', 'net').versionRef('net') library('datastructures-common', 'org.xbib', 'datastructures-common').versionRef('datastructures') diff --git a/src/main/java/org/xbib/event/CompleteFuture.java b/src/main/java/org/xbib/event/CompleteFuture.java index ad51d5c..d1a507b 100644 --- a/src/main/java/org/xbib/event/CompleteFuture.java +++ b/src/main/java/org/xbib/event/CompleteFuture.java @@ -34,6 +34,7 @@ public abstract class CompleteFuture extends AbstractFuture { return this; } + @SuppressWarnings("unchecked") @Override public Future addListeners(GenericFutureListener>... listeners) { for (GenericFutureListener> l: @@ -53,6 +54,7 @@ public abstract class CompleteFuture extends AbstractFuture { return this; } + @SuppressWarnings("unchecked") @Override public Future removeListeners(GenericFutureListener>... listeners) { // NOOP diff --git a/src/main/java/org/xbib/event/DefaultFutureListeners.java b/src/main/java/org/xbib/event/DefaultFutureListeners.java index 9e0eb69..07d9395 100644 --- a/src/main/java/org/xbib/event/DefaultFutureListeners.java +++ b/src/main/java/org/xbib/event/DefaultFutureListeners.java @@ -9,8 +9,7 @@ final class DefaultFutureListeners { private int progressiveSize; // the number of progressive listeners @SuppressWarnings("unchecked") - DefaultFutureListeners( - GenericFutureListener> first, GenericFutureListener> second) { + DefaultFutureListeners(GenericFutureListener> first, GenericFutureListener> second) { listeners = new GenericFutureListener[2]; listeners[0] = first; listeners[1] = second; diff --git a/src/main/java/org/xbib/event/DefaultProgressivePromise.java b/src/main/java/org/xbib/event/DefaultProgressivePromise.java index 8c0b9e7..c4ec00e 100644 --- a/src/main/java/org/xbib/event/DefaultProgressivePromise.java +++ b/src/main/java/org/xbib/event/DefaultProgressivePromise.java @@ -60,6 +60,7 @@ public class DefaultProgressivePromise extends DefaultPromise implements P return this; } + @SuppressWarnings("unchecked") @Override public ProgressivePromise addListeners(GenericFutureListener>... listeners) { super.addListeners(listeners); @@ -72,6 +73,7 @@ public class DefaultProgressivePromise extends DefaultPromise implements P return this; } + @SuppressWarnings("unchecked") @Override public ProgressivePromise removeListeners(GenericFutureListener>... listeners) { super.removeListeners(listeners); diff --git a/src/main/java/org/xbib/event/DefaultPromise.java b/src/main/java/org/xbib/event/DefaultPromise.java index ccefa9a..510e7fd 100644 --- a/src/main/java/org/xbib/event/DefaultPromise.java +++ b/src/main/java/org/xbib/event/DefaultPromise.java @@ -170,6 +170,7 @@ public class DefaultPromise extends AbstractFuture implements Promise { return this; } + @SuppressWarnings("unchecked") @Override public Promise addListeners(GenericFutureListener>... listeners) { Objects.requireNonNull(listeners, "listeners"); diff --git a/src/main/java/org/xbib/event/EventConsumer.java b/src/main/java/org/xbib/event/EventConsumer.java index 65171e7..1aa7d09 100644 --- a/src/main/java/org/xbib/event/EventConsumer.java +++ b/src/main/java/org/xbib/event/EventConsumer.java @@ -1,6 +1,4 @@ package org.xbib.event; -import java.io.Closeable; - -public interface EventConsumer extends Closeable { +public interface EventConsumer { } diff --git a/src/main/java/org/xbib/event/EventManager.java b/src/main/java/org/xbib/event/EventManager.java index 8352cf5..ccd27cc 100644 --- a/src/main/java/org/xbib/event/EventManager.java +++ b/src/main/java/org/xbib/event/EventManager.java @@ -23,7 +23,7 @@ import java.util.concurrent.Executors; import java.util.logging.Level; import java.util.logging.Logger; -public final class EventManager implements Closeable { +public final class EventManager { private static final Logger logger = Logger.getLogger(EventManager.class.getName()); @@ -79,17 +79,12 @@ public final class EventManager implements Closeable { return syslogEventManager; } - @Override - public boolean equals(Object obj) { - return super.equals(obj); - } - - @Override public void close() throws IOException { for (EventConsumer eventConsumer : builder.eventConsumers) { - eventConsumer.close(); + if (eventConsumer instanceof Closeable closeable) { + closeable.close(); + } } - genericEventManager.close(); clockEventManager.close(); timerEventManager.close(); fileFollowEventManager.close(); diff --git a/src/main/java/org/xbib/event/Future.java b/src/main/java/org/xbib/event/Future.java index cbfd227..ecb35cd 100644 --- a/src/main/java/org/xbib/event/Future.java +++ b/src/main/java/org/xbib/event/Future.java @@ -11,10 +11,12 @@ public interface Future extends java.util.concurrent.Future { Future addListener(GenericFutureListener> var1); + @SuppressWarnings("unchecked") Future addListeners(GenericFutureListener>... var1); Future removeListener(GenericFutureListener> var1); + @SuppressWarnings("unchecked") Future removeListeners(GenericFutureListener>... var1); Future sync() throws InterruptedException; diff --git a/src/main/java/org/xbib/event/ProgressiveFuture.java b/src/main/java/org/xbib/event/ProgressiveFuture.java index 0e2463a..997e14e 100644 --- a/src/main/java/org/xbib/event/ProgressiveFuture.java +++ b/src/main/java/org/xbib/event/ProgressiveFuture.java @@ -8,12 +8,14 @@ public interface ProgressiveFuture extends Future { @Override ProgressiveFuture addListener(GenericFutureListener> listener); + @SuppressWarnings("unchecked") @Override ProgressiveFuture addListeners(GenericFutureListener>... listeners); @Override ProgressiveFuture removeListener(GenericFutureListener> listener); + @SuppressWarnings("unchecked") @Override ProgressiveFuture removeListeners(GenericFutureListener>... listeners); diff --git a/src/main/java/org/xbib/event/ProgressivePromise.java b/src/main/java/org/xbib/event/ProgressivePromise.java index 40cd0d3..6cb84c4 100644 --- a/src/main/java/org/xbib/event/ProgressivePromise.java +++ b/src/main/java/org/xbib/event/ProgressivePromise.java @@ -27,12 +27,14 @@ public interface ProgressivePromise extends Promise, ProgressiveFuture @Override ProgressivePromise addListener(GenericFutureListener> listener); + @SuppressWarnings("unchecked") @Override ProgressivePromise addListeners(GenericFutureListener>... listeners); @Override ProgressivePromise removeListener(GenericFutureListener> listener); + @SuppressWarnings("unchecked") @Override ProgressivePromise removeListeners(GenericFutureListener>... listeners); diff --git a/src/main/java/org/xbib/event/Promise.java b/src/main/java/org/xbib/event/Promise.java index cd13a7a..0bebac2 100644 --- a/src/main/java/org/xbib/event/Promise.java +++ b/src/main/java/org/xbib/event/Promise.java @@ -13,10 +13,12 @@ public interface Promise extends Future { Promise addListener(GenericFutureListener> value); + @SuppressWarnings("unchecked") Promise addListeners(GenericFutureListener>... value); Promise removeListener(GenericFutureListener> value); + @SuppressWarnings("unchecked") Promise removeListeners(GenericFutureListener>... value); Promise await() throws InterruptedException; diff --git a/src/main/java/org/xbib/event/async/CompositeFuture.java b/src/main/java/org/xbib/event/async/CompositeFuture.java index 14c3a35..fba7080 100644 --- a/src/main/java/org/xbib/event/async/CompositeFuture.java +++ b/src/main/java/org/xbib/event/async/CompositeFuture.java @@ -13,233 +13,233 @@ import java.util.List; */ public interface CompositeFuture extends Future { - /** - * Return a composite future, succeeded when all futures are succeeded, failed when any future is failed. - *

- * The returned future fails as soon as one of {@code f1} or {@code f2} fails. - * - * @param f1 future - * @param f2 future - * @return the composite future - */ - static CompositeFuture all(Future f1, Future f2) { - return CompositeFutureImpl.all(f1, f2); - } - - /** - * Like {@link #all(Future, Future)} but with 3 futures. - */ - static CompositeFuture all(Future f1, Future f2, Future f3) { - return CompositeFutureImpl.all(f1, f2, f3); - } - - /** - * Like {@link #all(Future, Future)} but with 4 futures. - */ - static CompositeFuture all(Future f1, Future f2, Future f3, Future f4) { - return CompositeFutureImpl.all(f1, f2, f3, f4); - } - - /** - * Like {@link #all(Future, Future)} but with 5 futures. - */ - static CompositeFuture all(Future f1, Future f2, Future f3, Future f4, Future f5) { - return CompositeFutureImpl.all(f1, f2, f3, f4, f5); - } - - /** - * Like {@link #all(Future, Future)} but with 6 futures. - */ - static CompositeFuture all(Future f1, Future f2, Future f3, Future f4, Future f5, Future f6) { - return CompositeFutureImpl.all(f1, f2, f3, f4, f5, f6); - } - - /** - * Like {@link #all(Future, Future)} but with a list of futures.

- * - * When the list is empty, the returned future will be already completed. - */ - static CompositeFuture all(List futures) { - return CompositeFutureImpl.all(futures.toArray(new Future[0])); - } - - /** - * Return a composite future, succeeded when any futures is succeeded, failed when all futures are failed. - *

- * The returned future succeeds as soon as one of {@code f1} or {@code f2} succeeds. - * - * @param f1 future - * @param f2 future - * @return the composite future - */ - static CompositeFuture any(Future f1, Future f2) { - return CompositeFutureImpl.any(f1, f2); - } - - /** - * Like {@link #any(Future, Future)} but with 3 futures. - */ - static CompositeFuture any(Future f1, Future f2, Future f3) { - return CompositeFutureImpl.any(f1, f2, f3); - } - - /** - * Like {@link #any(Future, Future)} but with 4 futures. - */ - static CompositeFuture any(Future f1, Future f2, Future f3, Future f4) { - return CompositeFutureImpl.any(f1, f2, f3, f4); - } - - /** - * Like {@link #any(Future, Future)} but with 5 futures. - */ - static CompositeFuture any(Future f1, Future f2, Future f3, Future f4, Future f5) { - return CompositeFutureImpl.any(f1, f2, f3, f4, f5); - } - - /** - * Like {@link #any(Future, Future)} but with 6 futures. - */ - static CompositeFuture any(Future f1, Future f2, Future f3, Future f4, Future f5, Future f6) { - return CompositeFutureImpl.any(f1, f2, f3, f4, f5, f6); - } - - /** - * Like {@link #any(Future, Future)} but with a list of futures.

- * - * When the list is empty, the returned future will be already completed. - */ - static CompositeFuture any(List futures) { - return CompositeFutureImpl.any(futures.toArray(new Future[0])); - } - - /** - * Return a composite future, succeeded when all futures are succeeded, failed when any future is failed. - *

- * It always wait until all its futures are completed and will not fail as soon as one of {@code f1} or {@code f2} fails. - * - * @param f1 future - * @param f2 future - * @return the composite future - */ - static CompositeFuture join(Future f1, Future f2) { - return CompositeFutureImpl.join(f1, f2); - } - - /** - * Like {@link #join(Future, Future)} but with 3 futures. - */ - static CompositeFuture join(Future f1, Future f2, Future f3) { - return CompositeFutureImpl.join(f1, f2, f3); - } - - /** - * Like {@link #join(Future, Future)} but with 4 futures. - */ - static CompositeFuture join(Future f1, Future f2, Future f3, Future f4) { - return CompositeFutureImpl.join(f1, f2, f3, f4); - } - - /** - * Like {@link #join(Future, Future)} but with 5 futures. - */ - static CompositeFuture join(Future f1, Future f2, Future f3, Future f4, Future f5) { - return CompositeFutureImpl.join(f1, f2, f3, f4, f5); - } - - /** - * Like {@link #join(Future, Future)} but with 6 futures. - */ - static CompositeFuture join(Future f1, Future f2, Future f3, Future f4, Future f5, Future f6) { - return CompositeFutureImpl.join(f1, f2, f3, f4, f5, f6); - } - - /** - * Like {@link #join(Future, Future)} but with a list of futures.

- * - * When the list is empty, the returned future will be already completed. - */ - static CompositeFuture join(List futures) { - return CompositeFutureImpl.join(futures.toArray(new Future[0])); - } - - @Override - CompositeFuture onComplete(Handler> handler); - - @Override - default CompositeFuture onSuccess(Handler handler) { - Future.super.onSuccess(handler); - return this; - } - - @Override - default CompositeFuture onFailure(Handler handler) { - Future.super.onFailure(handler); - return this; - } - - /** - * Returns a cause of a wrapped future - * - * @param index the wrapped future index - */ - Throwable cause(int index); - - /** - * Returns true if a wrapped future is succeeded - * - * @param index the wrapped future index - */ - boolean succeeded(int index); - - /** - * Returns true if a wrapped future is failed - * - * @param index the wrapped future index - */ - boolean failed(int index); - - /** - * Returns true if a wrapped future is completed - * - * @param index the wrapped future index - */ - boolean isComplete(int index); - - /** - * Returns the result of a wrapped future - * - * @param index the wrapped future index - */ - T resultAt(int index); - - /** - * @return the number of wrapped future - */ - int size(); - - /** - * @return a list of the current completed values. If one future is not yet resolved or is failed, {@code} null - * will be used - */ - default List list() { - int size = size(); - ArrayList list = new ArrayList<>(size); - for (int index = 0;index < size;index++) { - list.add(resultAt(index)); + /** + * Return a composite future, succeeded when all futures are succeeded, failed when any future is failed. + *

+ * The returned future fails as soon as one of {@code f1} or {@code f2} fails. + * + * @param f1 future + * @param f2 future + * @return the composite future + */ + static CompositeFuture all(Future f1, Future f2) { + return CompositeFutureImpl.all(f1, f2); } - return list; - } - /** - * @return a list of all the eventual failure causes. If no future failed, returns a list of null values. - */ - default List causes() { - int size = size(); - ArrayList list = new ArrayList<>(size); - for (int index = 0; index < size; index++) { - list.add(cause(index)); + /** + * Like {@link #all(Future, Future)} but with 3 futures. + */ + static CompositeFuture all(Future f1, Future f2, Future f3) { + return CompositeFutureImpl.all(f1, f2, f3); + } + + /** + * Like {@link #all(Future, Future)} but with 4 futures. + */ + static CompositeFuture all(Future f1, Future f2, Future f3, Future f4) { + return CompositeFutureImpl.all(f1, f2, f3, f4); + } + + /** + * Like {@link #all(Future, Future)} but with 5 futures. + */ + static CompositeFuture all(Future f1, Future f2, Future f3, Future f4, Future f5) { + return CompositeFutureImpl.all(f1, f2, f3, f4, f5); + } + + /** + * Like {@link #all(Future, Future)} but with 6 futures. + */ + static CompositeFuture all(Future f1, Future f2, Future f3, Future f4, Future f5, Future f6) { + return CompositeFutureImpl.all(f1, f2, f3, f4, f5, f6); + } + + /** + * Like {@link #all(Future, Future)} but with a list of futures.

+ * + * When the list is empty, the returned future will be already completed. + */ + static CompositeFuture all(List> futures) { + return CompositeFutureImpl.all(futures.toArray(new Future[0])); + } + + /** + * Return a composite future, succeeded when any futures is succeeded, failed when all futures are failed. + *

+ * The returned future succeeds as soon as one of {@code f1} or {@code f2} succeeds. + * + * @param f1 future + * @param f2 future + * @return the composite future + */ + static CompositeFuture any(Future f1, Future f2) { + return CompositeFutureImpl.any(f1, f2); + } + + /** + * Like {@link #any(Future, Future)} but with 3 futures. + */ + static CompositeFuture any(Future f1, Future f2, Future f3) { + return CompositeFutureImpl.any(f1, f2, f3); + } + + /** + * Like {@link #any(Future, Future)} but with 4 futures. + */ + static CompositeFuture any(Future f1, Future f2, Future f3, Future f4) { + return CompositeFutureImpl.any(f1, f2, f3, f4); + } + + /** + * Like {@link #any(Future, Future)} but with 5 futures. + */ + static CompositeFuture any(Future f1, Future f2, Future f3, Future f4, Future f5) { + return CompositeFutureImpl.any(f1, f2, f3, f4, f5); + } + + /** + * Like {@link #any(Future, Future)} but with 6 futures. + */ + static CompositeFuture any(Future f1, Future f2, Future f3, Future f4, Future f5, Future f6) { + return CompositeFutureImpl.any(f1, f2, f3, f4, f5, f6); + } + + /** + * Like {@link #any(Future, Future)} but with a list of futures.

+ * + * When the list is empty, the returned future will be already completed. + */ + static CompositeFuture any(List> futures) { + return CompositeFutureImpl.any(futures.toArray(new Future[0])); + } + + /** + * Return a composite future, succeeded when all futures are succeeded, failed when any future is failed. + *

+ * It always wait until all its futures are completed and will not fail as soon as one of {@code f1} or {@code f2} fails. + * + * @param f1 future + * @param f2 future + * @return the composite future + */ + static CompositeFuture join(Future f1, Future f2) { + return CompositeFutureImpl.join(f1, f2); + } + + /** + * Like {@link #join(Future, Future)} but with 3 futures. + */ + static CompositeFuture join(Future f1, Future f2, Future f3) { + return CompositeFutureImpl.join(f1, f2, f3); + } + + /** + * Like {@link #join(Future, Future)} but with 4 futures. + */ + static CompositeFuture join(Future f1, Future f2, Future f3, Future f4) { + return CompositeFutureImpl.join(f1, f2, f3, f4); + } + + /** + * Like {@link #join(Future, Future)} but with 5 futures. + */ + static CompositeFuture join(Future f1, Future f2, Future f3, Future f4, Future f5) { + return CompositeFutureImpl.join(f1, f2, f3, f4, f5); + } + + /** + * Like {@link #join(Future, Future)} but with 6 futures. + */ + static CompositeFuture join(Future f1, Future f2, Future f3, Future f4, Future f5, Future f6) { + return CompositeFutureImpl.join(f1, f2, f3, f4, f5, f6); + } + + /** + * Like {@link #join(Future, Future)} but with a list of futures.

+ * + * When the list is empty, the returned future will be already completed. + */ + static CompositeFuture join(List> futures) { + return CompositeFutureImpl.join(futures.toArray(new Future[0])); + } + + @Override + CompositeFuture onComplete(Handler> handler); + + @Override + default CompositeFuture onSuccess(Handler handler) { + Future.super.onSuccess(handler); + return this; + } + + @Override + default CompositeFuture onFailure(Handler handler) { + Future.super.onFailure(handler); + return this; + } + + /** + * Returns a cause of a wrapped future + * + * @param index the wrapped future index + */ + Throwable cause(int index); + + /** + * Returns true if a wrapped future is succeeded + * + * @param index the wrapped future index + */ + boolean succeeded(int index); + + /** + * Returns true if a wrapped future is failed + * + * @param index the wrapped future index + */ + boolean failed(int index); + + /** + * Returns true if a wrapped future is completed + * + * @param index the wrapped future index + */ + boolean isComplete(int index); + + /** + * Returns the result of a wrapped future + * + * @param index the wrapped future index + */ + T resultAt(int index); + + /** + * @return the number of wrapped future + */ + int size(); + + /** + * @return a list of the current completed values. If one future is not yet resolved or is failed, {@code} null + * will be used + */ + default List list() { + int size = size(); + ArrayList list = new ArrayList<>(size); + for (int index = 0;index < size;index++) { + list.add(resultAt(index)); + } + return list; + } + + /** + * @return a list of all the eventual failure causes. If no future failed, returns a list of null values. + */ + default List causes() { + int size = size(); + ArrayList list = new ArrayList<>(size); + for (int index = 0; index < size; index++) { + list.add(cause(index)); + } + return list; } - return list; - } } diff --git a/src/main/java/org/xbib/event/async/Future.java b/src/main/java/org/xbib/event/async/Future.java index 3d894e1..413e552 100644 --- a/src/main/java/org/xbib/event/async/Future.java +++ b/src/main/java/org/xbib/event/async/Future.java @@ -13,402 +13,403 @@ import java.util.function.Function; */ public interface Future extends AsyncResult { - /** - * Create a future that hasn't completed yet and that is passed to the {@code handler} before it is returned. - * - * @param handler the handler - * @param the result type - * @return the future. - */ - static Future future(Handler> handler) { - Promise promise = Promise.promise(); - try { - handler.handle(promise); - } catch (Throwable e){ - promise.tryFail(e); + /** + * Create a future that hasn't completed yet and that is passed to the {@code handler} before it is returned. + * + * @param handler the handler + * @param the result type + * @return the future. + */ + static Future future(Handler> handler) { + Promise promise = Promise.promise(); + try { + handler.handle(promise); + } catch (Throwable e){ + promise.tryFail(e); + } + return promise.future(); } - return promise.future(); - } - /** - * Create a succeeded future with a null result - * - * @param the result type - * @return the future - */ - static Future succeededFuture() { - return (Future) SucceededFuture.EMPTY; - } - - /** - * Created a succeeded future with the specified result. - * - * @param result the result - * @param the result type - * @return the future - */ - static Future succeededFuture(T result) { - if (result == null) { - return succeededFuture(); - } else { - return new SucceededFuture<>(result); + /** + * Create a succeeded future with a null result + * + * @param the result type + * @return the future + */ + @SuppressWarnings("unchecked") + static Future succeededFuture() { + return (Future) SucceededFuture.EMPTY; } - } - /** - * Create a failed future with the specified failure cause. - * - * @param t the failure cause as a Throwable - * @param the result type - * @return the future - */ - static Future failedFuture(Throwable t) { - return new FailedFuture<>(t); - } + /** + * Created a succeeded future with the specified result. + * + * @param result the result + * @param the result type + * @return the future + */ + static Future succeededFuture(T result) { + if (result == null) { + return succeededFuture(); + } else { + return new SucceededFuture<>(result); + } + } - /** - * Create a failed future with the specified failure message. - * - * @param failureMessage the failure message - * @param the result type - * @return the future - */ - static Future failedFuture(String failureMessage) { - return new FailedFuture<>(failureMessage); - } + /** + * Create a failed future with the specified failure cause. + * + * @param t the failure cause as a Throwable + * @param the result type + * @return the future + */ + static Future failedFuture(Throwable t) { + return new FailedFuture<>(t); + } - /** - * Has the future completed? - *

- * It's completed if it's either succeeded or failed. - * - * @return true if completed, false if not - */ - boolean isComplete(); + /** + * Create a failed future with the specified failure message. + * + * @param failureMessage the failure message + * @param the result type + * @return the future + */ + static Future failedFuture(String failureMessage) { + return new FailedFuture<>(failureMessage); + } - /** - * Add a handler to be notified of the result. - *

- * WARNING: this is a terminal operation. - * If several {@code handler}s are registered, there is no guarantee that they will be invoked in order of registration. - * - * @param handler the handler that will be called with the result - * @return a reference to this, so it can be used fluently - */ - Future onComplete(Handler> handler); + /** + * Has the future completed? + *

+ * It's completed if it's either succeeded or failed. + * + * @return true if completed, false if not + */ + boolean isComplete(); - /** - * Add a handler to be notified of the succeeded result. - *

- * WARNING: this is a terminal operation. - * If several {@code handler}s are registered, there is no guarantee that they will be invoked in order of registration. - * - * @param handler the handler that will be called with the succeeded result - * @return a reference to this, so it can be used fluently - */ - default Future onSuccess(Handler handler) { - return onComplete(ar -> { - if (ar.succeeded()) { - handler.handle(ar.result()); - } - }); - } + /** + * Add a handler to be notified of the result. + *

+ * WARNING: this is a terminal operation. + * If several {@code handler}s are registered, there is no guarantee that they will be invoked in order of registration. + * + * @param handler the handler that will be called with the result + * @return a reference to this, so it can be used fluently + */ + Future onComplete(Handler> handler); - /** - * Add a handler to be notified of the failed result. - *

- * WARNING: this is a terminal operation. - * If several {@code handler}s are registered, there is no guarantee that they will be invoked in order of registration. - * - * @param handler the handler that will be called with the failed result - * @return a reference to this, so it can be used fluently - */ - default Future onFailure(Handler handler) { - return onComplete(ar -> { - if (ar.failed()) { - handler.handle(ar.cause()); - } - }); - } + /** + * Add a handler to be notified of the succeeded result. + *

+ * WARNING: this is a terminal operation. + * If several {@code handler}s are registered, there is no guarantee that they will be invoked in order of registration. + * + * @param handler the handler that will be called with the succeeded result + * @return a reference to this, so it can be used fluently + */ + default Future onSuccess(Handler handler) { + return onComplete(ar -> { + if (ar.succeeded()) { + handler.handle(ar.result()); + } + }); + } - /** - * The result of the operation. This will be null if the operation failed. - * - * @return the result or null if the operation failed. - */ - @Override - T result(); + /** + * Add a handler to be notified of the failed result. + *

+ * WARNING: this is a terminal operation. + * If several {@code handler}s are registered, there is no guarantee that they will be invoked in order of registration. + * + * @param handler the handler that will be called with the failed result + * @return a reference to this, so it can be used fluently + */ + default Future onFailure(Handler handler) { + return onComplete(ar -> { + if (ar.failed()) { + handler.handle(ar.cause()); + } + }); + } - /** - * A Throwable describing failure. This will be null if the operation succeeded. - * - * @return the cause or null if the operation succeeded. - */ - @Override - Throwable cause(); + /** + * The result of the operation. This will be null if the operation failed. + * + * @return the result or null if the operation failed. + */ + @Override + T result(); - /** - * Did it succeed? - * - * @return true if it succeded or false otherwise - */ - @Override - boolean succeeded(); + /** + * A Throwable describing failure. This will be null if the operation succeeded. + * + * @return the cause or null if the operation succeeded. + */ + @Override + Throwable cause(); - /** - * Did it fail? - * - * @return true if it failed or false otherwise - */ - @Override - boolean failed(); + /** + * Did it succeed? + * + * @return true if it succeded or false otherwise + */ + @Override + boolean succeeded(); - /** - * Alias for {@link #compose(Function)}. - */ - default Future flatMap(Function> mapper) { - return compose(mapper); - } + /** + * Did it fail? + * + * @return true if it failed or false otherwise + */ + @Override + boolean failed(); - /** - * Compose this future with a {@code mapper} function.

- * - * When this future (the one on which {@code compose} is called) succeeds, the {@code mapper} will be called with - * the completed value and this mapper returns another future object. This returned future completion will complete - * the future returned by this method call.

- * - * If the {@code mapper} throws an exception, the returned future will be failed with this exception.

- * - * When this future fails, the failure will be propagated to the returned future and the {@code mapper} - * will not be called. - * - * @param mapper the mapper function - * @return the composed future - */ - default Future compose(Function> mapper) { - return compose(mapper, Future::failedFuture); - } + /** + * Alias for {@link #compose(Function)}. + */ + default Future flatMap(Function> mapper) { + return compose(mapper); + } - /** - * Handles a failure of this Future by returning the result of another Future. - * If the mapper fails, then the returned future will be failed with this failure. - * - * @param mapper A function which takes the exception of a failure and returns a new future. - * @return A recovered future - */ - default Future recover(Function> mapper) { - return compose(Future::succeededFuture, mapper); - } + /** + * Compose this future with a {@code mapper} function.

+ * + * When this future (the one on which {@code compose} is called) succeeds, the {@code mapper} will be called with + * the completed value and this mapper returns another future object. This returned future completion will complete + * the future returned by this method call.

+ * + * If the {@code mapper} throws an exception, the returned future will be failed with this exception.

+ * + * When this future fails, the failure will be propagated to the returned future and the {@code mapper} + * will not be called. + * + * @param mapper the mapper function + * @return the composed future + */ + default Future compose(Function> mapper) { + return compose(mapper, Future::failedFuture); + } - /** - * Compose this future with a {@code successMapper} and {@code failureMapper} functions.

- * - * When this future (the one on which {@code compose} is called) succeeds, the {@code successMapper} will be called with - * the completed value and this mapper returns another future object. This returned future completion will complete - * the future returned by this method call.

- * - * When this future (the one on which {@code compose} is called) fails, the {@code failureMapper} will be called with - * the failure and this mapper returns another future object. This returned future completion will complete - * the future returned by this method call.

- * - * If any mapper function throws an exception, the returned future will be failed with this exception.

- * - * @param successMapper the function mapping the success - * @param failureMapper the function mapping the failure - * @return the composed future - */ - Future compose(Function> successMapper, Function> failureMapper); + /** + * Handles a failure of this Future by returning the result of another Future. + * If the mapper fails, then the returned future will be failed with this failure. + * + * @param mapper A function which takes the exception of a failure and returns a new future. + * @return A recovered future + */ + default Future recover(Function> mapper) { + return compose(Future::succeededFuture, mapper); + } - /** - * Transform this future with a {@code mapper} functions.

- * - * When this future (the one on which {@code transform} is called) completes, the {@code mapper} will be called with - * the async result and this mapper returns another future object. This returned future completion will complete - * the future returned by this method call.

- * - * If any mapper function throws an exception, the returned future will be failed with this exception.

- * - * @param mapper the function mapping the future - * @return the transformed future - */ - Future transform(Function, Future> mapper); + /** + * Compose this future with a {@code successMapper} and {@code failureMapper} functions.

+ * + * When this future (the one on which {@code compose} is called) succeeds, the {@code successMapper} will be called with + * the completed value and this mapper returns another future object. This returned future completion will complete + * the future returned by this method call.

+ * + * When this future (the one on which {@code compose} is called) fails, the {@code failureMapper} will be called with + * the failure and this mapper returns another future object. This returned future completion will complete + * the future returned by this method call.

+ * + * If any mapper function throws an exception, the returned future will be failed with this exception.

+ * + * @param successMapper the function mapping the success + * @param failureMapper the function mapping the failure + * @return the composed future + */ + Future compose(Function> successMapper, Function> failureMapper); - /** - * Compose this future with a {@code mapper} that will be always be called. - * - *

When this future (the one on which {@code eventually} is called) completes, the {@code mapper} will be called - * and this mapper returns another future object. This returned future completion will complete the future returned - * by this method call with the original result of the future. - * - *

The outcome of the future returned by the {@code mapper} will not influence the nature - * of the returned future. - * - * @param mapper the function returning the future. - * @return the composed future - */ - Future eventually(Function> mapper); + /** + * Transform this future with a {@code mapper} functions.

+ * + * When this future (the one on which {@code transform} is called) completes, the {@code mapper} will be called with + * the async result and this mapper returns another future object. This returned future completion will complete + * the future returned by this method call.

+ * + * If any mapper function throws an exception, the returned future will be failed with this exception.

+ * + * @param mapper the function mapping the future + * @return the transformed future + */ + Future transform(Function, Future> mapper); - /** - * Apply a {@code mapper} function on this future.

- * - * When this future succeeds, the {@code mapper} will be called with the completed value and this mapper - * returns a value. This value will complete the future returned by this method call.

- * - * If the {@code mapper} throws an exception, the returned future will be failed with this exception.

- * - * When this future fails, the failure will be propagated to the returned future and the {@code mapper} - * will not be called. - * - * @param mapper the mapper function - * @return the mapped future - */ - Future map(Function mapper); + /** + * Compose this future with a {@code mapper} that will be always be called. + * + *

When this future (the one on which {@code eventually} is called) completes, the {@code mapper} will be called + * and this mapper returns another future object. This returned future completion will complete the future returned + * by this method call with the original result of the future. + * + *

The outcome of the future returned by the {@code mapper} will not influence the nature + * of the returned future. + * + * @param mapper the function returning the future. + * @return the composed future + */ + Future eventually(Function> mapper); - /** - * Map the result of a future to a specific {@code value}.

- * - * When this future succeeds, this {@code value} will complete the future returned by this method call.

- * - * When this future fails, the failure will be propagated to the returned future. - * - * @param value the value that eventually completes the mapped future - * @return the mapped future - */ - Future map(V value); + /** + * Apply a {@code mapper} function on this future.

+ * + * When this future succeeds, the {@code mapper} will be called with the completed value and this mapper + * returns a value. This value will complete the future returned by this method call.

+ * + * If the {@code mapper} throws an exception, the returned future will be failed with this exception.

+ * + * When this future fails, the failure will be propagated to the returned future and the {@code mapper} + * will not be called. + * + * @param mapper the mapper function + * @return the mapped future + */ + Future map(Function mapper); - /** - * Map the result of a future to {@code null}.

- * - * This is a conveniency for {@code future.map((T) null)} or {@code future.map((Void) null)}.

- * - * When this future succeeds, {@code null} will complete the future returned by this method call.

- * - * When this future fails, the failure will be propagated to the returned future. - * - * @return the mapped future - */ - @Override - default Future mapEmpty() { - return (Future) AsyncResult.super.mapEmpty(); - } + /** + * Map the result of a future to a specific {@code value}.

+ * + * When this future succeeds, this {@code value} will complete the future returned by this method call.

+ * + * When this future fails, the failure will be propagated to the returned future. + * + * @param value the value that eventually completes the mapped future + * @return the mapped future + */ + Future map(V value); - /** - * Apply a {@code mapper} function on this future.

- * - * When this future fails, the {@code mapper} will be called with the completed value and this mapper - * returns a value. This value will complete the future returned by this method call.

- * - * If the {@code mapper} throws an exception, the returned future will be failed with this exception.

- * - * When this future succeeds, the result will be propagated to the returned future and the {@code mapper} - * will not be called. - * - * @param mapper the mapper function - * @return the mapped future - */ - Future otherwise(Function mapper); + /** + * Map the result of a future to {@code null}.

+ * + * This is a conveniency for {@code future.map((T) null)} or {@code future.map((Void) null)}.

+ * + * When this future succeeds, {@code null} will complete the future returned by this method call.

+ * + * When this future fails, the failure will be propagated to the returned future. + * + * @return the mapped future + */ + @Override + default Future mapEmpty() { + return (Future) AsyncResult.super.mapEmpty(); + } - /** - * Map the failure of a future to a specific {@code value}.

- * - * When this future fails, this {@code value} will complete the future returned by this method call.

- * - * When this future succeeds, the result will be propagated to the returned future. - * - * @param value the value that eventually completes the mapped future - * @return the mapped future - */ - Future otherwise(T value); + /** + * Apply a {@code mapper} function on this future.

+ * + * When this future fails, the {@code mapper} will be called with the completed value and this mapper + * returns a value. This value will complete the future returned by this method call.

+ * + * If the {@code mapper} throws an exception, the returned future will be failed with this exception.

+ * + * When this future succeeds, the result will be propagated to the returned future and the {@code mapper} + * will not be called. + * + * @param mapper the mapper function + * @return the mapped future + */ + Future otherwise(Function mapper); - /** - * Map the failure of a future to {@code null}.

- * - * This is a convenience for {@code future.otherwise((T) null)}.

- * - * When this future fails, the {@code null} value will complete the future returned by this method call.

- * - * When this future succeeds, the result will be propagated to the returned future. - * - * @return the mapped future - */ - default Future otherwiseEmpty() { - return (Future) AsyncResult.super.otherwiseEmpty(); - } + /** + * Map the failure of a future to a specific {@code value}.

+ * + * When this future fails, this {@code value} will complete the future returned by this method call.

+ * + * When this future succeeds, the result will be propagated to the returned future. + * + * @param value the value that eventually completes the mapped future + * @return the mapped future + */ + Future otherwise(T value); - /** - * Invokes the given {@code handler} upon completion. - *

- * If the {@code handler} throws an exception, the returned future will be failed with this exception. - * - * @param handler invoked upon completion of this future - * @return a future completed after the {@code handler} has been invoked - */ - default Future andThen(Handler> handler) { - return transform(ar -> { - handler.handle(ar); - return (Future) ar; - }); - } + /** + * Map the failure of a future to {@code null}.

+ * + * This is a convenience for {@code future.otherwise((T) null)}.

+ * + * When this future fails, the {@code null} value will complete the future returned by this method call.

+ * + * When this future succeeds, the result will be propagated to the returned future. + * + * @return the mapped future + */ + default Future otherwiseEmpty() { + return (Future) AsyncResult.super.otherwiseEmpty(); + } - /** - * Bridges this Vert.x future to a {@link CompletionStage} instance. - *

- * The {@link CompletionStage} handling methods will be called from the thread that resolves this future. - * - * @return a {@link CompletionStage} that completes when this future resolves - */ - default CompletionStage toCompletionStage() { - CompletableFuture completableFuture = new CompletableFuture<>(); - onComplete(ar -> { - if (ar.succeeded()) { - completableFuture.complete(ar.result()); - } else { - completableFuture.completeExceptionally(ar.cause()); - } - }); - return completableFuture; - } + /** + * Invokes the given {@code handler} upon completion. + *

+ * If the {@code handler} throws an exception, the returned future will be failed with this exception. + * + * @param handler invoked upon completion of this future + * @return a future completed after the {@code handler} has been invoked + */ + default Future andThen(Handler> handler) { + return transform(ar -> { + handler.handle(ar); + return (Future) ar; + }); + } - /** - * Bridges a {@link CompletionStage} object to a Vert.x future instance. - *

- * The Vert.x future handling methods will be called from the thread that completes {@code completionStage}. - * - * @param completionStage a completion stage - * @param the result type - * @return a Vert.x future that resolves when {@code completionStage} resolves - */ - static Future fromCompletionStage(CompletionStage completionStage) { - Promise promise = Promise.promise(); - completionStage.whenComplete((value, err) -> { - if (err != null) { - promise.fail(err); - } else { - promise.complete(value); - } - }); - return promise.future(); - } + /** + * Bridges this Vert.x future to a {@link CompletionStage} instance. + *

+ * The {@link CompletionStage} handling methods will be called from the thread that resolves this future. + * + * @return a {@link CompletionStage} that completes when this future resolves + */ + default CompletionStage toCompletionStage() { + CompletableFuture completableFuture = new CompletableFuture<>(); + onComplete(ar -> { + if (ar.succeeded()) { + completableFuture.complete(ar.result()); + } else { + completableFuture.completeExceptionally(ar.cause()); + } + }); + return completableFuture; + } - /** - * Bridges a {@link CompletionStage} object to a Vert.x future instance. - *

- * The Vert.x future handling methods will be called on the provided {@code context}. - * - * @param completionStage a completion stage - * @param context a Vert.x context to dispatch to - * @param the result type - * @return a Vert.x future that resolves when {@code completionStage} resolves - */ - static Future fromCompletionStage(CompletionStage completionStage, Context context) { - Promise promise = ((ContextInternal) context).promise(); - completionStage.whenComplete((value, err) -> { - if (err != null) { - promise.fail(err); - } else { - promise.complete(value); - } - }); - return promise.future(); - } + /** + * Bridges a {@link CompletionStage} object to a Vert.x future instance. + *

+ * The Vert.x future handling methods will be called from the thread that completes {@code completionStage}. + * + * @param completionStage a completion stage + * @param the result type + * @return a Vert.x future that resolves when {@code completionStage} resolves + */ + static Future fromCompletionStage(CompletionStage completionStage) { + Promise promise = Promise.promise(); + completionStage.whenComplete((value, err) -> { + if (err != null) { + promise.fail(err); + } else { + promise.complete(value); + } + }); + return promise.future(); + } + + /** + * Bridges a {@link CompletionStage} object to a Vert.x future instance. + *

+ * The Vert.x future handling methods will be called on the provided {@code context}. + * + * @param completionStage a completion stage + * @param context a Vert.x context to dispatch to + * @param the result type + * @return a Vert.x future that resolves when {@code completionStage} resolves + */ + static Future fromCompletionStage(CompletionStage completionStage, Context context) { + Promise promise = ((ContextInternal) context).promise(); + completionStage.whenComplete((value, err) -> { + if (err != null) { + promise.fail(err); + } else { + promise.complete(value); + } + }); + return promise.future(); + } } diff --git a/src/main/java/org/xbib/event/async/impl/future/CompositeFutureImpl.java b/src/main/java/org/xbib/event/async/impl/future/CompositeFutureImpl.java index 3deafbc..e3db579 100644 --- a/src/main/java/org/xbib/event/async/impl/future/CompositeFutureImpl.java +++ b/src/main/java/org/xbib/event/async/impl/future/CompositeFutureImpl.java @@ -9,181 +9,178 @@ import java.util.function.Function; public class CompositeFutureImpl extends FutureImpl implements CompositeFuture { - public static CompositeFuture all(Future... results) { - CompositeFutureImpl composite = new CompositeFutureImpl(results); - int len = results.length; - for (Future result : results) { - result.onComplete(ar -> { - if (ar.succeeded()) { - synchronized (composite) { - if (composite.count == len || ++composite.count != len) { - return; - } - } - composite.trySucceed(); - } else { - synchronized (composite) { - if (composite.count == len) { - return; - } - composite.count = len; - } - composite.tryFail(ar.cause()); + public static CompositeFuture all(Future... results) { + CompositeFutureImpl composite = new CompositeFutureImpl(results); + int len = results.length; + for (Future result : results) { + result.onComplete(ar -> { + if (ar.succeeded()) { + synchronized (composite) { + if (composite.count == len || ++composite.count != len) { + return; + } + } + composite.trySucceed(); + } else { + synchronized (composite) { + if (composite.count == len) { + return; + } + composite.count = len; + } + composite.tryFail(ar.cause()); + } + }); } - }); - } - if (len == 0) { - composite.trySucceed(); - } - return composite; - } - - public static CompositeFuture any(Future... results) { - CompositeFutureImpl composite = new CompositeFutureImpl(results); - int len = results.length; - for (Future result : results) { - result.onComplete(ar -> { - if (ar.succeeded()) { - synchronized (composite) { - if (composite.count == len) { - return; - } - composite.count = len; - } - composite.trySucceed(); - } else { - synchronized (composite) { - if (composite.count == len || ++composite.count != len) { - return; - } - } - composite.tryFail(ar.cause()); + if (len == 0) { + composite.trySucceed(); } - }); + return composite; } - if (results.length == 0) { - composite.trySucceed(); - } - return composite; - } - private static final Function ALL = cf -> { - int size = cf.size(); - for (int i = 0;i < size;i++) { - if (!cf.succeeded(i)) { - return cf.cause(i); - } - } - return cf; - }; - - public static CompositeFuture join(Future... results) { - return join(ALL, results); - } - - private static CompositeFuture join(Function pred, Future... results) { - CompositeFutureImpl composite = new CompositeFutureImpl(results); - int len = results.length; - for (Future result : results) { - result.onComplete(ar -> { - synchronized (composite) { - if (++composite.count < len) { - return; - } + public static CompositeFuture any(Future... results) { + CompositeFutureImpl composite = new CompositeFutureImpl(results); + int len = results.length; + for (Future result : results) { + result.onComplete(ar -> { + if (ar.succeeded()) { + synchronized (composite) { + if (composite.count == len) { + return; + } + composite.count = len; + } + composite.trySucceed(); + } else { + synchronized (composite) { + if (composite.count == len || ++composite.count != len) { + return; + } + } + composite.tryFail(ar.cause()); + } + }); } - composite.complete(pred.apply(composite)); - }); + if (results.length == 0) { + composite.trySucceed(); + } + return composite; } - if (len == 0) { - composite.trySucceed(); + + private static final Function ALL = cf -> { + int size = cf.size(); + for (int i = 0;i < size;i++) { + if (!cf.succeeded(i)) { + return cf.cause(i); + } + } + return cf; + }; + + public static CompositeFuture join(Future... results) { + return join(ALL, results); } - return composite; - } - private final Future[] results; - private int count; - - private CompositeFutureImpl(Future... results) { - this.results = results; - } - - @Override - public Throwable cause(int index) { - return future(index).cause(); - } - - @Override - public boolean succeeded(int index) { - return future(index).succeeded(); - } - - @Override - public boolean failed(int index) { - return future(index).failed(); - } - - @Override - public boolean isComplete(int index) { - return future(index).isComplete(); - } - - @Override - public T resultAt(int index) { - return this.future(index).result(); - } - - private Future future(int index) { - if (index < 0 || index >= results.length) { - throw new IndexOutOfBoundsException(); + private static CompositeFuture join(Function pred, Future... results) { + CompositeFutureImpl composite = new CompositeFutureImpl(results); + int len = results.length; + for (Future result : results) { + result.onComplete(ar -> { + synchronized (composite) { + if (++composite.count < len) { + return; + } + } + composite.complete(pred.apply(composite)); + }); + } + if (len == 0) { + composite.trySucceed(); + } + return composite; } - return (Future) results[index]; - } - @Override - public int size() { - return results.length; - } + private final Future[] results; + private int count; - private void trySucceed() { - tryComplete(this); - } - - private void fail(Throwable t) { - complete(t); - } - - private void complete(Object result) { - if (result == this) { - tryComplete(this); - } else if (result instanceof Throwable) { - tryFail((Throwable) result); + private CompositeFutureImpl(Future... results) { + this.results = results; } - } - @Override - public CompositeFuture onComplete(Handler> handler) { - return (CompositeFuture)super.onComplete(handler); - } - - @Override - public CompositeFuture onSuccess(Handler handler) { - return (CompositeFuture)super.onSuccess(handler); - } - - @Override - public CompositeFuture onFailure(Handler handler) { - return (CompositeFuture)super.onFailure(handler); - } - - @Override - protected void formatValue(Object value, StringBuilder sb) { - sb.append('('); - for (int i = 0;i < results.length;i++) { - if (i > 0) { - sb.append(','); - } - sb.append(results[i]); + @Override + public Throwable cause(int index) { + return future(index).cause(); + } + + @Override + public boolean succeeded(int index) { + return future(index).succeeded(); + } + + @Override + public boolean failed(int index) { + return future(index).failed(); + } + + @Override + public boolean isComplete(int index) { + return future(index).isComplete(); + } + + @Override + public T resultAt(int index) { + return this.future(index).result(); + } + + @SuppressWarnings("unchecked") + private Future future(int index) { + if (index < 0 || index >= results.length) { + throw new IndexOutOfBoundsException(); + } + return (Future) results[index]; + } + + @Override + public int size() { + return results.length; + } + + private void trySucceed() { + tryComplete(this); + } + + private void complete(Object result) { + if (result == this) { + tryComplete(this); + } else if (result instanceof Throwable) { + tryFail((Throwable) result); + } + } + + @Override + public CompositeFuture onComplete(Handler> handler) { + return (CompositeFuture)super.onComplete(handler); + } + + @Override + public CompositeFuture onSuccess(Handler handler) { + return (CompositeFuture)super.onSuccess(handler); + } + + @Override + public CompositeFuture onFailure(Handler handler) { + return (CompositeFuture)super.onFailure(handler); + } + + @Override + protected void formatValue(Object value, StringBuilder sb) { + sb.append('('); + for (int i = 0;i < results.length;i++) { + if (i > 0) { + sb.append(','); + } + sb.append(results[i]); + } + sb.append(')'); } - sb.append(')'); - } } diff --git a/src/main/java/org/xbib/event/async/impl/future/FailedFuture.java b/src/main/java/org/xbib/event/async/impl/future/FailedFuture.java index 9bc800d..9354869 100644 --- a/src/main/java/org/xbib/event/async/impl/future/FailedFuture.java +++ b/src/main/java/org/xbib/event/async/impl/future/FailedFuture.java @@ -13,115 +13,118 @@ import java.util.function.Function; */ public final class FailedFuture extends FutureBase { - private final Throwable cause; + private final Throwable cause; - /** - * Create a future that has already failed - * @param t the throwable - */ - public FailedFuture(Throwable t) { - this(null, t); - } - - /** - * Create a future that has already failed - * @param t the throwable - */ - public FailedFuture(ContextInternal context, Throwable t) { - super(context); - this.cause = t != null ? t : new NoStackTraceThrowable(null); - } - - /** - * Create a future that has already failed - * @param failureMessage the failure message - */ - public FailedFuture(String failureMessage) { - this(null, failureMessage); - } - - /** - * Create a future that has already failed - * @param failureMessage the failure message - */ - public FailedFuture(ContextInternal context, String failureMessage) { - this(context, new NoStackTraceThrowable(failureMessage)); - } - - @Override - public boolean isComplete() { - return true; - } - - @Override - public Future onComplete(Handler> handler) { - if (handler instanceof Listener) { - emitFailure(cause, (Listener) handler); - } else if (context != null) { - context.emit(this, handler); - } else { - handler.handle(this); + /** + * Create a future that has already failed + * @param t the throwable + */ + public FailedFuture(Throwable t) { + this(null, t); } - return this; - } - @Override - public Future onSuccess(Handler handler) { - return this; - } - - @Override - public Future onFailure(Handler handler) { - if (context != null) { - context.emit(cause, handler); - } else { - handler.handle(cause); + /** + * Create a future that has already failed + * @param t the throwable + */ + public FailedFuture(ContextInternal context, Throwable t) { + super(context); + this.cause = t != null ? t : new NoStackTraceThrowable(null); } - return this; - } - @Override - public void addListener(Listener listener) { - emitFailure(cause, listener); - } + /** + * Create a future that has already failed + * @param failureMessage the failure message + */ + public FailedFuture(String failureMessage) { + this(null, failureMessage); + } - @Override - public T result() { - return null; - } + /** + * Create a future that has already failed + * @param failureMessage the failure message + */ + public FailedFuture(ContextInternal context, String failureMessage) { + this(context, new NoStackTraceThrowable(failureMessage)); + } - @Override - public Throwable cause() { - return cause; - } + @Override + public boolean isComplete() { + return true; + } - @Override - public boolean succeeded() { - return false; - } + @SuppressWarnings("unchecked") + @Override + public Future onComplete(Handler> handler) { + if (handler instanceof Listener) { + emitFailure(cause, (Listener) handler); + } else if (context != null) { + context.emit(this, handler); + } else { + handler.handle(this); + } + return this; + } - @Override - public boolean failed() { - return true; - } + @Override + public Future onSuccess(Handler handler) { + return this; + } - @Override - public Future map(Function mapper) { - return (Future) this; - } + @Override + public Future onFailure(Handler handler) { + if (context != null) { + context.emit(cause, handler); + } else { + handler.handle(cause); + } + return this; + } - @Override - public Future map(V value) { - return (Future) this; - } + @Override + public void addListener(Listener listener) { + emitFailure(cause, listener); + } - @Override - public Future otherwise(T value) { - return new SucceededFuture<>(context, value); - } + @Override + public T result() { + return null; + } - @Override - public String toString() { - return "Future{cause=" + cause.getMessage() + "}"; - } + @Override + public Throwable cause() { + return cause; + } + + @Override + public boolean succeeded() { + return false; + } + + @Override + public boolean failed() { + return true; + } + + @SuppressWarnings("unchecked") + @Override + public Future map(Function mapper) { + return (Future) this; + } + + @SuppressWarnings("unchecked") + @Override + public Future map(V value) { + return (Future) this; + } + + @Override + public Future otherwise(T value) { + return new SucceededFuture<>(context, value); + } + + @Override + public String toString() { + return "Future{cause=" + cause.getMessage() + "}"; + } } diff --git a/src/main/java/org/xbib/event/async/impl/future/FutureImpl.java b/src/main/java/org/xbib/event/async/impl/future/FutureImpl.java index 2626efc..6eda1dc 100644 --- a/src/main/java/org/xbib/event/async/impl/future/FutureImpl.java +++ b/src/main/java/org/xbib/event/async/impl/future/FutureImpl.java @@ -14,255 +14,259 @@ import java.util.Objects; */ class FutureImpl extends FutureBase { - private static final Object NULL_VALUE = new Object(); + private static final Object NULL_VALUE = new Object(); - private Object value; - private Listener listener; + private Object value; + private Listener listener; - /** - * Create a future that hasn't completed yet - */ - FutureImpl() { - super(); - } + /** + * Create a future that hasn't completed yet + */ + FutureImpl() { + super(); + } - /** - * Create a future that hasn't completed yet - */ - FutureImpl(ContextInternal context) { - super(context); - } + /** + * Create a future that hasn't completed yet + */ + FutureImpl(ContextInternal context) { + super(context); + } - /** - * The result of the operation. This will be null if the operation failed. - */ - public synchronized T result() { - return value instanceof CauseHolder ? null : value == NULL_VALUE ? null : (T) value; - } + /** + * The result of the operation. This will be null if the operation failed. + */ + @SuppressWarnings("unchecked") + public synchronized T result() { + return value instanceof CauseHolder ? null : value == NULL_VALUE ? null : (T) value; + } - /** - * An exception describing failure. This will be null if the operation succeeded. - */ - public synchronized Throwable cause() { - return value instanceof CauseHolder ? ((CauseHolder)value).cause : null; - } + /** + * An exception describing failure. This will be null if the operation succeeded. + */ + public synchronized Throwable cause() { + return value instanceof CauseHolder ? ((CauseHolder)value).cause : null; + } - /** - * Did it succeed? - */ - public synchronized boolean succeeded() { - return value != null && !(value instanceof CauseHolder); - } + /** + * Did it succeed? + */ + public synchronized boolean succeeded() { + return value != null && !(value instanceof CauseHolder); + } - /** - * Did it fail? - */ - public synchronized boolean failed() { - return value instanceof CauseHolder; - } + /** + * Did it fail? + */ + public synchronized boolean failed() { + return value instanceof CauseHolder; + } - /** - * Has it completed? - */ - public synchronized boolean isComplete() { - return value != null; - } + /** + * Has it completed? + */ + public synchronized boolean isComplete() { + return value != null; + } - @Override - public Future onSuccess(Handler handler) { - Objects.requireNonNull(handler, "No null handler accepted"); - addListener(new Listener() { - @Override - public void onSuccess(T value) { - try { - handler.handle(value); - } catch (Throwable t) { - if (context != null) { - context.reportException(t); - } else { - throw t; - } + @Override + public Future onSuccess(Handler handler) { + Objects.requireNonNull(handler, "No null handler accepted"); + addListener(new Listener() { + @Override + public void onSuccess(T value) { + try { + handler.handle(value); + } catch (Throwable t) { + if (context != null) { + context.reportException(t); + } else { + throw t; + } + } + } + @Override + public void onFailure(Throwable failure) { + } + }); + return this; + } + + @Override + public Future onFailure(Handler handler) { + Objects.requireNonNull(handler, "No null handler accepted"); + addListener(new Listener() { + @Override + public void onSuccess(T value) { + } + @Override + public void onFailure(Throwable failure) { + try { + handler.handle(failure); + } catch (Throwable t) { + if (context != null) { + context.reportException(t); + } else { + throw t; + } + } + } + }); + return this; + } + + @SuppressWarnings("unchecked") + @Override + public Future onComplete(Handler> handler) { + Objects.requireNonNull(handler, "No null handler accepted"); + Listener listener; + if (handler instanceof Listener) { + listener = (Listener) handler; + } else { + listener = new Listener() { + @Override + public void onSuccess(T value) { + try { + handler.handle(FutureImpl.this); + } catch (Throwable t) { + if (context != null) { + context.reportException(t); + } else { + throw t; + } + } + } + @Override + public void onFailure(Throwable failure) { + try { + handler.handle(FutureImpl.this); + } catch (Throwable t) { + if (context != null) { + context.reportException(t); + } else { + throw t; + } + } + } + }; } - } - @Override - public void onFailure(Throwable failure) { - } - }); - return this; - } + addListener(listener); + return this; + } - @Override - public Future onFailure(Handler handler) { - Objects.requireNonNull(handler, "No null handler accepted"); - addListener(new Listener() { - @Override - public void onSuccess(T value) { - } - @Override - public void onFailure(Throwable failure) { - try { - handler.handle(failure); - } catch (Throwable t) { - if (context != null) { - context.reportException(t); - } else { - throw t; - } + @SuppressWarnings("unchecked") + @Override + public void addListener(Listener listener) { + Object v; + synchronized (this) { + v = value; + if (v == null) { + if (this.listener == null) { + this.listener = listener; + } else { + ListenerArray listeners; + if (this.listener instanceof FutureImpl.ListenerArray) { + listeners = (ListenerArray) this.listener; + } else { + listeners = new ListenerArray<>(); + listeners.add(this.listener); + this.listener = listeners; + } + listeners.add(listener); + } + return; + } } - } - }); - return this; - } + if (v instanceof CauseHolder) { + emitFailure(((CauseHolder)v).cause, listener); + } else { + if (v == NULL_VALUE) { + v = null; + } + emitSuccess((T) v, listener); + } + } - @Override - public Future onComplete(Handler> handler) { - Objects.requireNonNull(handler, "No null handler accepted"); - Listener listener; - if (handler instanceof Listener) { - listener = (Listener) handler; - } else { - listener = new Listener() { + public boolean tryComplete(T result) { + Listener l; + synchronized (this) { + if (value != null) { + return false; + } + value = result == null ? NULL_VALUE : result; + l = listener; + listener = null; + } + if (l != null) { + emitSuccess(result, l); + } + return true; + } + + public boolean tryFail(Throwable cause) { + if (cause == null) { + cause = new NoStackTraceThrowable(null); + } + Listener l; + synchronized (this) { + if (value != null) { + return false; + } + value = new CauseHolder(cause); + l = listener; + listener = null; + } + if (l != null) { + emitFailure(cause, l); + } + return true; + } + + @Override + public String toString() { + synchronized (this) { + if (value instanceof CauseHolder) { + return "Future{cause=" + ((CauseHolder)value).cause.getMessage() + "}"; + } + if (value != null) { + if (value == NULL_VALUE) { + return "Future{result=null}"; + } + StringBuilder sb = new StringBuilder("Future{result="); + formatValue(value, sb); + sb.append("}"); + return sb.toString(); + } + return "Future{unresolved}"; + } + } + + protected void formatValue(Object value, StringBuilder sb) { + sb.append(value); + } + + @SuppressWarnings("serial") + private static class ListenerArray extends ArrayList> implements Listener { @Override public void onSuccess(T value) { - try { - handler.handle(FutureImpl.this); - } catch (Throwable t) { - if (context != null) { - context.reportException(t); - } else { - throw t; + for (Listener handler : this) { + handler.onSuccess(value); } - } } @Override public void onFailure(Throwable failure) { - try { - handler.handle(FutureImpl.this); - } catch (Throwable t) { - if (context != null) { - context.reportException(t); - } else { - throw t; + for (Listener handler : this) { + handler.onFailure(failure); } - } } - }; } - addListener(listener); - return this; - } - @Override - public void addListener(Listener listener) { - Object v; - synchronized (this) { - v = value; - if (v == null) { - if (this.listener == null) { - this.listener = listener; - } else { - ListenerArray listeners; - if (this.listener instanceof FutureImpl.ListenerArray) { - listeners = (ListenerArray) this.listener; - } else { - listeners = new ListenerArray<>(); - listeners.add(this.listener); - this.listener = listeners; - } - listeners.add(listener); + private static class CauseHolder { + + private final Throwable cause; + + CauseHolder(Throwable cause) { + this.cause = cause; } - return; - } } - if (v instanceof CauseHolder) { - emitFailure(((CauseHolder)v).cause, listener); - } else { - if (v == NULL_VALUE) { - v = null; - } - emitSuccess((T) v, listener); - } - } - - public boolean tryComplete(T result) { - Listener l; - synchronized (this) { - if (value != null) { - return false; - } - value = result == null ? NULL_VALUE : result; - l = listener; - listener = null; - } - if (l != null) { - emitSuccess(result, l); - } - return true; - } - - public boolean tryFail(Throwable cause) { - if (cause == null) { - cause = new NoStackTraceThrowable(null); - } - Listener l; - synchronized (this) { - if (value != null) { - return false; - } - value = new CauseHolder(cause); - l = listener; - listener = null; - } - if (l != null) { - emitFailure(cause, l); - } - return true; - } - - @Override - public String toString() { - synchronized (this) { - if (value instanceof CauseHolder) { - return "Future{cause=" + ((CauseHolder)value).cause.getMessage() + "}"; - } - if (value != null) { - if (value == NULL_VALUE) { - return "Future{result=null}"; - } - StringBuilder sb = new StringBuilder("Future{result="); - formatValue(value, sb); - sb.append("}"); - return sb.toString(); - } - return "Future{unresolved}"; - } - } - - protected void formatValue(Object value, StringBuilder sb) { - sb.append(value); - } - - private static class ListenerArray extends ArrayList> implements Listener { - @Override - public void onSuccess(T value) { - for (Listener handler : this) { - handler.onSuccess(value); - } - } - @Override - public void onFailure(Throwable failure) { - for (Listener handler : this) { - handler.onFailure(failure); - } - } - } - - private static class CauseHolder { - - private final Throwable cause; - - CauseHolder(Throwable cause) { - this.cause = cause; - } - } } diff --git a/src/main/java/org/xbib/event/async/impl/future/SucceededFuture.java b/src/main/java/org/xbib/event/async/impl/future/SucceededFuture.java index 1987192..521e431 100644 --- a/src/main/java/org/xbib/event/async/impl/future/SucceededFuture.java +++ b/src/main/java/org/xbib/event/async/impl/future/SucceededFuture.java @@ -13,106 +13,106 @@ import java.util.function.Function; */ public final class SucceededFuture extends FutureBase { - /** - * Stateless instance of empty results that can be shared safely. - */ - public static final SucceededFuture EMPTY = new SucceededFuture(null, null); + /** + * Stateless instance of empty results that can be shared safely. + */ + public static final SucceededFuture EMPTY = new SucceededFuture<>(null, null); - private final T result; + private final T result; - /** - * Create a future that has already succeeded - * @param result the result - */ - public SucceededFuture(T result) { - this(null, result); - } - - /** - * Create a future that has already succeeded - * @param context the context - * @param result the result - */ - public SucceededFuture(ContextInternal context, T result) { - super(context); - this.result = result; - } - - @Override - public boolean isComplete() { - return true; - } - - @Override - public Future onSuccess(Handler handler) { - if (context != null) { - context.emit(result, handler); - } else { - handler.handle(result); + /** + * Create a future that has already succeeded + * @param result the result + */ + public SucceededFuture(T result) { + this(null, result); } - return this; - } - @Override - public Future onFailure(Handler handler) { - return this; - } - - @Override - public Future onComplete(Handler> handler) { - if (handler instanceof Listener) { - emitSuccess(result ,(Listener) handler); - } else if (context != null) { - context.emit(this, handler); - } else { - handler.handle(this); + /** + * Create a future that has already succeeded + * @param context the context + * @param result the result + */ + public SucceededFuture(ContextInternal context, T result) { + super(context); + this.result = result; } - return this; - } - @Override - public void addListener(Listener listener) { - emitSuccess(result ,listener); - } + @Override + public boolean isComplete() { + return true; + } - @Override - public T result() { - return result; - } + @Override + public Future onSuccess(Handler handler) { + if (context != null) { + context.emit(result, handler); + } else { + handler.handle(result); + } + return this; + } - @Override - public Throwable cause() { - return null; - } + @Override + public Future onFailure(Handler handler) { + return this; + } - @Override - public boolean succeeded() { - return true; - } + @Override + public Future onComplete(Handler> handler) { + if (handler instanceof Listener) { + emitSuccess(result ,(Listener) handler); + } else if (context != null) { + context.emit(this, handler); + } else { + handler.handle(this); + } + return this; + } - @Override - public boolean failed() { - return false; - } + @Override + public void addListener(Listener listener) { + emitSuccess(result ,listener); + } - @Override - public Future map(V value) { - return new SucceededFuture<>(context, value); - } + @Override + public T result() { + return result; + } - @Override - public Future otherwise(Function mapper) { - Objects.requireNonNull(mapper, "No null mapper accepted"); - return this; - } + @Override + public Throwable cause() { + return null; + } - @Override - public Future otherwise(T value) { - return this; - } + @Override + public boolean succeeded() { + return true; + } - @Override - public String toString() { - return "Future{result=" + result + "}"; - } + @Override + public boolean failed() { + return false; + } + + @Override + public Future map(V value) { + return new SucceededFuture<>(context, value); + } + + @Override + public Future otherwise(Function mapper) { + Objects.requireNonNull(mapper, "No null mapper accepted"); + return this; + } + + @Override + public Future otherwise(T value) { + return this; + } + + @Override + public String toString() { + return "Future{result=" + result + "}"; + } } diff --git a/src/main/java/org/xbib/event/bus/Dispatcher.java b/src/main/java/org/xbib/event/bus/Dispatcher.java index 3923351..d71adfb 100644 --- a/src/main/java/org/xbib/event/bus/Dispatcher.java +++ b/src/main/java/org/xbib/event/bus/Dispatcher.java @@ -85,7 +85,6 @@ public abstract class Dispatcher { Objects.requireNonNull(subscribers); Queue queueForThread = queue.get(); queueForThread.offer(new Event(event, subscribers)); - if (!dispatching.get()) { dispatching.set(true); try { diff --git a/src/main/java/org/xbib/event/bus/EventBus.java b/src/main/java/org/xbib/event/bus/EventBus.java index 9adee94..785138f 100644 --- a/src/main/java/org/xbib/event/bus/EventBus.java +++ b/src/main/java/org/xbib/event/bus/EventBus.java @@ -1,11 +1,9 @@ package org.xbib.event.bus; -import java.lang.reflect.Method; import java.util.Iterator; import java.util.Locale; import java.util.Objects; import java.util.concurrent.Executor; -import java.util.concurrent.Executors; import java.util.logging.Level; import java.util.logging.Logger; @@ -134,6 +132,10 @@ public class EventBus { return identifier; } + public SubscriberRegistry getSubscribers() { + return subscribers; + } + /** Returns the default executor this event bus uses for dispatching events to subscribers. */ final Executor executor() { return executor; @@ -183,7 +185,7 @@ public class EventBus { * @param event event to post. */ public void post(Object event) { - Iterator eventSubscribers = subscribers.getSubscribers(event); + Iterator eventSubscribers = subscribers.getIterator(event); if (eventSubscribers.hasNext()) { dispatcher.dispatch(event, eventSubscribers); } else if (!(event instanceof DeadEvent)) { @@ -197,33 +199,4 @@ public class EventBus { return "EventBus{" + identifier + "}"; } - /** Simple logging handler for subscriber exceptions. */ - static final class LoggingHandler implements SubscriberExceptionHandler { - static final LoggingHandler INSTANCE = new LoggingHandler(); - - @Override - public void handleException(Throwable exception, SubscriberExceptionContext context) { - Logger logger = logger(context); - if (logger.isLoggable(Level.SEVERE)) { - logger.log(Level.SEVERE, message(context), exception); - } - } - - private static Logger logger(SubscriberExceptionContext context) { - return Logger.getLogger(EventBus.class.getName() + "." + context.getEventBus().identifier()); - } - - private static String message(SubscriberExceptionContext context) { - Method method = context.getSubscriberMethod(); - return "Exception thrown by subscriber method " - + method.getName() - + '(' - + method.getParameterTypes()[0].getName() - + ')' - + " on subscriber " - + context.getSubscriber() - + " when dispatching event: " - + context.getEvent(); - } - } } diff --git a/src/main/java/org/xbib/event/bus/LoggingHandler.java b/src/main/java/org/xbib/event/bus/LoggingHandler.java new file mode 100644 index 0000000..5cb97d3 --- /dev/null +++ b/src/main/java/org/xbib/event/bus/LoggingHandler.java @@ -0,0 +1,37 @@ +package org.xbib.event.bus; + +import java.lang.reflect.Method; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Simple logging handler for subscriber exceptions. + */ +final class LoggingHandler implements SubscriberExceptionHandler { + static final LoggingHandler INSTANCE = new LoggingHandler(); + + @Override + public void handleException(Throwable exception, SubscriberExceptionContext context) { + Logger logger = logger(context); + if (logger.isLoggable(Level.SEVERE)) { + logger.log(Level.SEVERE, message(context), exception); + } + } + + private static Logger logger(SubscriberExceptionContext context) { + return Logger.getLogger(EventBus.class.getName() + "." + context.getEventBus().identifier()); + } + + private static String message(SubscriberExceptionContext context) { + Method method = context.getSubscriberMethod(); + return "Exception thrown by subscriber method " + + method.getName() + + '(' + + method.getParameterTypes()[0].getName() + + ')' + + " on subscriber " + + context.getSubscriber() + + " when dispatching event: " + + context.getEvent(); + } +} diff --git a/src/main/java/org/xbib/event/bus/SubscriberRegistry.java b/src/main/java/org/xbib/event/bus/SubscriberRegistry.java index f1f93d8..a5a0ec4 100644 --- a/src/main/java/org/xbib/event/bus/SubscriberRegistry.java +++ b/src/main/java/org/xbib/event/bus/SubscriberRegistry.java @@ -23,7 +23,7 @@ import org.xbib.event.bus.util.TypeToken; /** * Registry of subscribers to a single event bus. */ -final class SubscriberRegistry { +public final class SubscriberRegistry { /** * All registered subscribers, indexed by event type. @@ -43,7 +43,6 @@ final class SubscriberRegistry { /** Registers all subscriber methods on the given listener object. */ void register(Object listener) { - MultiMap, Subscriber> listenerMethods = findAllSubscribers(listener); for (Entry, Collection> entry : listenerMethods.asMap().entrySet()) { Class eventType = entry.getKey(); @@ -53,7 +52,6 @@ final class SubscriberRegistry { CopyOnWriteArraySet newSet = new CopyOnWriteArraySet<>(); eventSubscribers = firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet); } - eventSubscribers.addAll(eventMethodsInListener); } } @@ -73,13 +71,12 @@ final class SubscriberRegistry { throw new IllegalArgumentException( "missing event subscriber for an annotated method. Is " + listener + " registered?"); } - // don't try to remove the set if it's empty; that can't be done safely without a lock // anyway, if the set is empty it'll just be wrapping an array of length 0 } } - Set getSubscribersForTesting(Class eventType) { + public Set getSubscribersForTesting(Class eventType) { return firstNonNull(subscribers.get(eventType), new LinkedHashSet<>()); } @@ -87,7 +84,7 @@ final class SubscriberRegistry { * Gets an iterator representing an immutable snapshot of all subscribers to the given event at * the time this method is called. */ - Iterator getSubscribers(Object event) { + Iterator getIterator(Object event) { Set> eventTypes = flattenHierarchy(event.getClass()); List> subscriberIterators = new ArrayList<>(eventTypes.size()); for (Class eventType : eventTypes) { diff --git a/src/main/java/org/xbib/event/clock/SimpleClockEventConsumer.java b/src/main/java/org/xbib/event/clock/SimpleClockEventConsumer.java index 30c8624..9c9fbc2 100644 --- a/src/main/java/org/xbib/event/clock/SimpleClockEventConsumer.java +++ b/src/main/java/org/xbib/event/clock/SimpleClockEventConsumer.java @@ -20,7 +20,4 @@ public class SimpleClockEventConsumer implements EventConsumer { logger.info("received demo clock event, instant = " + event.getInstant()); } - @Override - public void close() throws IOException { - } } diff --git a/src/main/java/org/xbib/event/generic/DefaultGenericEvent.java b/src/main/java/org/xbib/event/generic/DefaultGenericEvent.java new file mode 100644 index 0000000..5195538 --- /dev/null +++ b/src/main/java/org/xbib/event/generic/DefaultGenericEvent.java @@ -0,0 +1,31 @@ +package org.xbib.event.generic; + +import org.xbib.event.DefaultEvent; + +public class DefaultGenericEvent extends DefaultEvent implements GenericEvent { + + private Listener listener; + + public DefaultGenericEvent() { + this(null); + } + + public DefaultGenericEvent(Listener listener) { + this.listener = listener; + } + + public DefaultGenericEvent setListener(Listener listener) { + this.listener = listener; + return this; + } + + public Listener getListener() { + return listener; + } + + public void received() { + if (listener != null) { + listener.listen(this); + } + } +} diff --git a/src/main/java/org/xbib/event/generic/GenericEvent.java b/src/main/java/org/xbib/event/generic/GenericEvent.java new file mode 100644 index 0000000..033ccca --- /dev/null +++ b/src/main/java/org/xbib/event/generic/GenericEvent.java @@ -0,0 +1,6 @@ +package org.xbib.event.generic; + +import org.xbib.event.Event; + +public interface GenericEvent extends Event { +} diff --git a/src/main/java/org/xbib/event/generic/GenericEventManager.java b/src/main/java/org/xbib/event/generic/GenericEventManager.java index b541de8..b899dd5 100644 --- a/src/main/java/org/xbib/event/generic/GenericEventManager.java +++ b/src/main/java/org/xbib/event/generic/GenericEventManager.java @@ -1,9 +1,13 @@ package org.xbib.event.generic; -import java.io.Closeable; -import org.xbib.event.bus.AsyncEventBus; +import java.util.Set; +import java.util.concurrent.CompletableFuture; -public class GenericEventManager implements Closeable { +import org.xbib.event.bus.AsyncEventBus; +import org.xbib.event.bus.Subscriber; +import org.xbib.event.bus.SubscriberRegistry; + +public class GenericEventManager { private final AsyncEventBus eventBus; @@ -15,7 +19,36 @@ public class GenericEventManager implements Closeable { eventBus.post(event); } - @Override - public void close() { + public void post(DefaultGenericEvent event, + CompletableFuture future) { + SubscriberRegistry subscriberRegistry = eventBus.getSubscribers(); + Set set = subscriberRegistry.getSubscribersForTesting(event.getClass()); + event.setListener(new WrappedListener(event.getListener(), set.size(), future)); + post(event); + } + + static class WrappedListener implements Listener { + + private final Listener listener; + + private int size; + + private final CompletableFuture future; + + public WrappedListener(Listener listener, int size, CompletableFuture future) { + this.listener = listener; + this.size = size; + this.future = future; + } + + @Override + public void listen(GenericEvent event) { + if (listener != null) { + listener.listen(event); + } + if (--size == 0) { + future.complete(true); + } + } } } diff --git a/src/main/java/org/xbib/event/generic/Listener.java b/src/main/java/org/xbib/event/generic/Listener.java new file mode 100644 index 0000000..20a70aa --- /dev/null +++ b/src/main/java/org/xbib/event/generic/Listener.java @@ -0,0 +1,7 @@ +package org.xbib.event.generic; + +@FunctionalInterface +public interface Listener { + + void listen(GenericEvent event); +} diff --git a/src/main/java/org/xbib/event/io/EmptySubscriber.java b/src/main/java/org/xbib/event/io/EmptySubscriber.java index cf99de3..bafd59e 100644 --- a/src/main/java/org/xbib/event/io/EmptySubscriber.java +++ b/src/main/java/org/xbib/event/io/EmptySubscriber.java @@ -10,6 +10,10 @@ import org.reactivestreams.Subscription; * @param */ public class EmptySubscriber implements org.reactivestreams.Subscriber { + + public EmptySubscriber() { + } + @Override public void onSubscribe(Subscription subscription) { } diff --git a/src/main/java/org/xbib/event/io/file/DefaultFileFollowEvent.java b/src/main/java/org/xbib/event/io/file/DefaultFileFollowEvent.java index 3250895..553561d 100644 --- a/src/main/java/org/xbib/event/io/file/DefaultFileFollowEvent.java +++ b/src/main/java/org/xbib/event/io/file/DefaultFileFollowEvent.java @@ -10,6 +10,9 @@ public class DefaultFileFollowEvent extends DefaultEvent implements FileFollowEv private String content; + public DefaultFileFollowEvent() { + } + @Override public void setPath(Path path) { this.path = path; diff --git a/src/main/java/org/xbib/event/loop/AbstractEventExecutorGroup.java b/src/main/java/org/xbib/event/loop/AbstractEventExecutorGroup.java index bc24b69..6b2c8d0 100644 --- a/src/main/java/org/xbib/event/loop/AbstractEventExecutorGroup.java +++ b/src/main/java/org/xbib/event/loop/AbstractEventExecutorGroup.java @@ -15,6 +15,10 @@ import java.util.concurrent.TimeoutException; * Abstract base class for {@link EventExecutorGroup} implementations. */ public abstract class AbstractEventExecutorGroup implements EventExecutorGroup { + + protected AbstractEventExecutorGroup() { + } + @Override public Future submit(Runnable task) { return next().submit(task); diff --git a/src/main/java/org/xbib/event/loop/SingleThreadEventExecutor.java b/src/main/java/org/xbib/event/loop/SingleThreadEventExecutor.java index 07bd9e3..513d820 100644 --- a/src/main/java/org/xbib/event/loop/SingleThreadEventExecutor.java +++ b/src/main/java/org/xbib/event/loop/SingleThreadEventExecutor.java @@ -1097,7 +1097,7 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx @Override public long id() { - return t.getId(); + return t.threadId(); } @Override diff --git a/src/main/java/org/xbib/event/syslog/CEFMessageParser.java b/src/main/java/org/xbib/event/syslog/CEFMessageParser.java index ac42de8..b2e02f9 100644 --- a/src/main/java/org/xbib/event/syslog/CEFMessageParser.java +++ b/src/main/java/org/xbib/event/syslog/CEFMessageParser.java @@ -7,7 +7,7 @@ import java.util.List; import java.util.Map; import java.util.regex.Matcher; -public class CEFMessageParser extends MessageParser { +public final class CEFMessageParser extends MessageParser { private static final String CEF_PREFIX_PATTERN = "^(<(?\\d+)>)?(?([a-zA-Z]{3}\\s+\\d+\\s+\\d+:\\d+:\\d+)|([0-9T:.Z-]+))\\s+(?\\S+)\\s+CEF:(?\\d+)\\|(?.*)$"; diff --git a/src/main/java/org/xbib/event/syslog/DefaultSyslogMessage.java b/src/main/java/org/xbib/event/syslog/DefaultSyslogMessage.java index 18a0bc7..6f2fe98 100644 --- a/src/main/java/org/xbib/event/syslog/DefaultSyslogMessage.java +++ b/src/main/java/org/xbib/event/syslog/DefaultSyslogMessage.java @@ -201,6 +201,9 @@ public class DefaultSyslogMessage implements SyslogMessage { Map map; + private Builder() { + } + public Builder date(LocalDateTime date) { this.date = date; return this; diff --git a/src/main/java/org/xbib/event/syslog/RFC3164MessageParser.java b/src/main/java/org/xbib/event/syslog/RFC3164MessageParser.java index c0c696d..a09727c 100644 --- a/src/main/java/org/xbib/event/syslog/RFC3164MessageParser.java +++ b/src/main/java/org/xbib/event/syslog/RFC3164MessageParser.java @@ -3,7 +3,7 @@ package org.xbib.event.syslog; import java.time.LocalDateTime; import java.util.regex.Matcher; -public class RFC3164MessageParser extends MessageParser { +public final class RFC3164MessageParser extends MessageParser { private static final String PATTERN = "^(<(?\\d+)>)?(?([a-zA-Z]{3}\\s+\\d+\\s+\\d+:\\d+:\\d+)|([0-9T:.Z-]+))\\s+(?\\S+)\\s+((?[^\\[\\s\\]]+)(\\[(?\\d+)\\])?:)*\\s*(?.+)$"; diff --git a/src/main/java/org/xbib/event/syslog/RFC5424MessageParser.java b/src/main/java/org/xbib/event/syslog/RFC5424MessageParser.java index 81e51fb..68f6566 100644 --- a/src/main/java/org/xbib/event/syslog/RFC5424MessageParser.java +++ b/src/main/java/org/xbib/event/syslog/RFC5424MessageParser.java @@ -7,7 +7,7 @@ import java.time.LocalDateTime; import java.util.List; import java.util.regex.Matcher; -public class RFC5424MessageParser extends MessageParser { +public final class RFC5424MessageParser extends MessageParser { private static final String PATTERN = "^<(?\\d+)>(?\\d{1,3})\\s*(?[0-9:+-TZ]+)\\s*(?\\S+)\\s*(?\\S+)\\s*(?\\S+)\\s*(?\\S+)\\s*(?(-|\\[.+\\]))\\s*(?.+)$"; diff --git a/src/main/java/org/xbib/event/thread/InternalThreadLocalMap.java b/src/main/java/org/xbib/event/thread/InternalThreadLocalMap.java index 08350dc..960b019 100644 --- a/src/main/java/org/xbib/event/thread/InternalThreadLocalMap.java +++ b/src/main/java/org/xbib/event/thread/InternalThreadLocalMap.java @@ -64,7 +64,6 @@ public final class InternalThreadLocalMap { private BitSet cleanerFlags; - /** @deprecated These padding fields will be removed in the future. */ public long rp1, rp2, rp3, rp4, rp5, rp6, rp7, rp8; static { diff --git a/src/test/java/org/xbib/event/EventManagerTest.java b/src/test/java/org/xbib/event/EventManagerTest.java new file mode 100644 index 0000000..3744d35 --- /dev/null +++ b/src/test/java/org/xbib/event/EventManagerTest.java @@ -0,0 +1,80 @@ +package org.xbib.event; + +import org.junit.jupiter.api.Test; +import org.xbib.event.bus.Subscribe; +import org.xbib.event.generic.DefaultGenericEvent; +import org.xbib.event.generic.GenericEvent; +import org.xbib.settings.Settings; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.logging.Level; +import java.util.logging.Logger; + +public class EventManagerTest { + + private static final Logger logger = Logger.getLogger(EventManagerTest.class.getName()); + + @Test + void testGenericEvents() { + Settings settings = Settings.settingsBuilder() + .build(); + TestEventConsumer consumer = new TestEventConsumer(); + EventManager eventManager = EventManager.builder(settings) + .register(consumer) + .build(); + eventManager.getGenericEventManager().post(new DefaultGenericEvent(e -> { + logger.log(Level.INFO, "received event " + e); + })); + } + + + @Test + void testGenericEventWithWaitForSubscriber() throws InterruptedException, ExecutionException { + Settings settings = Settings.settingsBuilder() + .build(); + TestEventConsumer consumer = new TestEventConsumer(); + EventManager eventManager = EventManager.builder(settings) + .register(consumer) + .build(); + CompletableFuture future = new CompletableFuture<>(); + eventManager.getGenericEventManager().post(new DefaultGenericEvent(e -> { + logger.log(Level.INFO, "received event " + e); + future.complete(e); + })); + GenericEvent e = future.get(); + logger.log(Level.INFO, "the event was received with result " + e + ", continuing"); + } + + @Test + void testGenericEventWithWaitForAllConsumers() throws ExecutionException, InterruptedException { + Settings settings = Settings.settingsBuilder() + .build(); + TestEventConsumer consumer1 = new TestEventConsumer(); + TestEventConsumer consumer2 = new TestEventConsumer(); + EventManager eventManager = EventManager.builder(settings) + .register(consumer1) + .register(consumer2) + .loadEventConsumers() + .build(); + CompletableFuture future = new CompletableFuture<>(); + eventManager.getGenericEventManager().post(new DefaultGenericEvent(e -> { + logger.log(Level.INFO, "received event " + e); + }), future); + Boolean b = future.get(); + if (b != null && b) { + logger.log(Level.INFO, "the event was received by all consumers, continuing"); + } + } + + private static class TestEventConsumer implements EventConsumer { + + TestEventConsumer() { + } + + @Subscribe + public void onEvent(DefaultGenericEvent event) { + event.received(); + } + } +} diff --git a/src/test/java/org/xbib/event/clock/TestClockEventConsumer.java b/src/test/java/org/xbib/event/clock/TestClockEventConsumer.java index 0578ba8..afacd21 100644 --- a/src/test/java/org/xbib/event/clock/TestClockEventConsumer.java +++ b/src/test/java/org/xbib/event/clock/TestClockEventConsumer.java @@ -18,8 +18,4 @@ public class TestClockEventConsumer implements EventConsumer { void onEvent(TestClockEvent event) { logger.log(Level.INFO, "received test clock event on " + Instant.now() + " event instant = " + event.getInstant()); } - - @Override - public void close() throws IOException { - } } diff --git a/src/test/java/org/xbib/event/io/file/TestFileFollowEventConsumer.java b/src/test/java/org/xbib/event/io/file/TestFileFollowEventConsumer.java index b183318..4833723 100644 --- a/src/test/java/org/xbib/event/io/file/TestFileFollowEventConsumer.java +++ b/src/test/java/org/xbib/event/io/file/TestFileFollowEventConsumer.java @@ -19,8 +19,4 @@ public class TestFileFollowEventConsumer implements EventConsumer { void onEvent(TestFileFollowEvent event) { logger.log(Level.INFO, "received filefollow event path = " + event.getPath() + " content = " + event.getContent()); } - - @Override - public void close() throws IOException { - } } diff --git a/src/test/java/org/xbib/event/timer/TestTimerEventConsumer.java b/src/test/java/org/xbib/event/timer/TestTimerEventConsumer.java index 85feb3b..7164633 100644 --- a/src/test/java/org/xbib/event/timer/TestTimerEventConsumer.java +++ b/src/test/java/org/xbib/event/timer/TestTimerEventConsumer.java @@ -19,7 +19,4 @@ public class TestTimerEventConsumer implements EventConsumer { logger.log(Level.INFO, "received test timer event on " + Instant.now() + " event instant = " + event.getInstant()); } - @Override - public void close() throws IOException { - } }