add event listener and future completion to generic event manager

This commit is contained in:
Jörg Prante 2023-12-11 11:38:35 +01:00
parent 3145720f40
commit ef0a26bbd1
42 changed files with 1424 additions and 1253 deletions

View file

@ -1,5 +1,5 @@
group = org.xbib group = org.xbib
name = event name = event
version = 0.0.7 version = 0.0.8
org.gradle.warning.mode = ALL org.gradle.warning.mode = ALL

Binary file not shown.

View file

@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.4-all.zip distributionUrl=https\://services.gradle.org/distributions/gradle-8.5-all.zip
networkTimeout=10000 networkTimeout=10000
validateDistributionUrl=true validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME zipStoreBase=GRADLE_USER_HOME

View file

@ -15,11 +15,10 @@ pluginManagement {
dependencyResolutionManagement { dependencyResolutionManagement {
versionCatalogs { versionCatalogs {
libs { libs {
version('gradle', '8.4') version('gradle', '8.5')
version('groovy', '4.0.13')
version('datastructures', '5.0.5') version('datastructures', '5.0.5')
version('netty', '4.1.100.Final') version('netty', '4.1.101.Final')
version('net', '4.0.0') version('net', '4.0.2')
library('netty-handler', 'io.netty', 'netty-handler').versionRef('netty') library('netty-handler', 'io.netty', 'netty-handler').versionRef('netty')
library('net', 'org.xbib', 'net').versionRef('net') library('net', 'org.xbib', 'net').versionRef('net')
library('datastructures-common', 'org.xbib', 'datastructures-common').versionRef('datastructures') library('datastructures-common', 'org.xbib', 'datastructures-common').versionRef('datastructures')

View file

@ -34,6 +34,7 @@ public abstract class CompleteFuture<V> extends AbstractFuture<V> {
return this; return this;
} }
@SuppressWarnings("unchecked")
@Override @Override
public Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) { public Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
for (GenericFutureListener<? extends Future<? super V>> l: for (GenericFutureListener<? extends Future<? super V>> l:
@ -53,6 +54,7 @@ public abstract class CompleteFuture<V> extends AbstractFuture<V> {
return this; return this;
} }
@SuppressWarnings("unchecked")
@Override @Override
public Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners) { public Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
// NOOP // NOOP

View file

@ -9,8 +9,7 @@ final class DefaultFutureListeners {
private int progressiveSize; // the number of progressive listeners private int progressiveSize; // the number of progressive listeners
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
DefaultFutureListeners( DefaultFutureListeners(GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) {
GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) {
listeners = new GenericFutureListener[2]; listeners = new GenericFutureListener[2];
listeners[0] = first; listeners[0] = first;
listeners[1] = second; listeners[1] = second;

View file

@ -60,6 +60,7 @@ public class DefaultProgressivePromise<V> extends DefaultPromise<V> implements P
return this; return this;
} }
@SuppressWarnings("unchecked")
@Override @Override
public ProgressivePromise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) { public ProgressivePromise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
super.addListeners(listeners); super.addListeners(listeners);
@ -72,6 +73,7 @@ public class DefaultProgressivePromise<V> extends DefaultPromise<V> implements P
return this; return this;
} }
@SuppressWarnings("unchecked")
@Override @Override
public ProgressivePromise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners) { public ProgressivePromise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
super.removeListeners(listeners); super.removeListeners(listeners);

View file

@ -170,6 +170,7 @@ public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
return this; return this;
} }
@SuppressWarnings("unchecked")
@Override @Override
public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) { public Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners) {
Objects.requireNonNull(listeners, "listeners"); Objects.requireNonNull(listeners, "listeners");

View file

@ -1,6 +1,4 @@
package org.xbib.event; package org.xbib.event;
import java.io.Closeable; public interface EventConsumer {
public interface EventConsumer extends Closeable {
} }

View file

@ -23,7 +23,7 @@ import java.util.concurrent.Executors;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
public final class EventManager implements Closeable { public final class EventManager {
private static final Logger logger = Logger.getLogger(EventManager.class.getName()); private static final Logger logger = Logger.getLogger(EventManager.class.getName());
@ -79,17 +79,12 @@ public final class EventManager implements Closeable {
return syslogEventManager; return syslogEventManager;
} }
@Override
public boolean equals(Object obj) {
return super.equals(obj);
}
@Override
public void close() throws IOException { public void close() throws IOException {
for (EventConsumer eventConsumer : builder.eventConsumers) { for (EventConsumer eventConsumer : builder.eventConsumers) {
eventConsumer.close(); if (eventConsumer instanceof Closeable closeable) {
closeable.close();
}
} }
genericEventManager.close();
clockEventManager.close(); clockEventManager.close();
timerEventManager.close(); timerEventManager.close();
fileFollowEventManager.close(); fileFollowEventManager.close();

View file

@ -11,10 +11,12 @@ public interface Future<V> extends java.util.concurrent.Future<V> {
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> var1); Future<V> addListener(GenericFutureListener<? extends Future<? super V>> var1);
@SuppressWarnings("unchecked")
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... var1); Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... var1);
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> var1); Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> var1);
@SuppressWarnings("unchecked")
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... var1); Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... var1);
Future<V> sync() throws InterruptedException; Future<V> sync() throws InterruptedException;

View file

@ -8,12 +8,14 @@ public interface ProgressiveFuture<V> extends Future<V> {
@Override @Override
ProgressiveFuture<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); ProgressiveFuture<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
@SuppressWarnings("unchecked")
@Override @Override
ProgressiveFuture<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners); ProgressiveFuture<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override @Override
ProgressiveFuture<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); ProgressiveFuture<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
@SuppressWarnings("unchecked")
@Override @Override
ProgressiveFuture<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners); ProgressiveFuture<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

View file

@ -27,12 +27,14 @@ public interface ProgressivePromise<V> extends Promise<V>, ProgressiveFuture<V>
@Override @Override
ProgressivePromise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); ProgressivePromise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
@SuppressWarnings("unchecked")
@Override @Override
ProgressivePromise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners); ProgressivePromise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
@Override @Override
ProgressivePromise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener); ProgressivePromise<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
@SuppressWarnings("unchecked")
@Override @Override
ProgressivePromise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners); ProgressivePromise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);

View file

@ -13,10 +13,12 @@ public interface Promise<V> extends Future<V> {
Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> value); Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> value);
@SuppressWarnings("unchecked")
Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... value); Promise<V> addListeners(GenericFutureListener<? extends Future<? super V>>... value);
Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> value); Promise<V> removeListener(GenericFutureListener<? extends Future<? super V>> value);
@SuppressWarnings("unchecked")
Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... value); Promise<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... value);
Promise<V> await() throws InterruptedException; Promise<V> await() throws InterruptedException;

View file

@ -13,233 +13,233 @@ import java.util.List;
*/ */
public interface CompositeFuture extends Future<CompositeFuture> { public interface CompositeFuture extends Future<CompositeFuture> {
/** /**
* Return a composite future, succeeded when all futures are succeeded, failed when any future is failed. * Return a composite future, succeeded when all futures are succeeded, failed when any future is failed.
* <p/> * <p/>
* The returned future fails as soon as one of {@code f1} or {@code f2} fails. * The returned future fails as soon as one of {@code f1} or {@code f2} fails.
* *
* @param f1 future * @param f1 future
* @param f2 future * @param f2 future
* @return the composite future * @return the composite future
*/ */
static <T1, T2> CompositeFuture all(Future<T1> f1, Future<T2> f2) { static <T1, T2> CompositeFuture all(Future<T1> f1, Future<T2> f2) {
return CompositeFutureImpl.all(f1, f2); return CompositeFutureImpl.all(f1, f2);
}
/**
* Like {@link #all(Future, Future)} but with 3 futures.
*/
static <T1, T2, T3> CompositeFuture all(Future<T1> f1, Future<T2> f2, Future<T3> f3) {
return CompositeFutureImpl.all(f1, f2, f3);
}
/**
* Like {@link #all(Future, Future)} but with 4 futures.
*/
static <T1, T2, T3, T4> CompositeFuture all(Future<T1> f1, Future<T2> f2, Future<T3> f3, Future<T4> f4) {
return CompositeFutureImpl.all(f1, f2, f3, f4);
}
/**
* Like {@link #all(Future, Future)} but with 5 futures.
*/
static <T1, T2, T3, T4, T5> CompositeFuture all(Future<T1> f1, Future<T2> f2, Future<T3> f3, Future<T4> f4, Future<T5> f5) {
return CompositeFutureImpl.all(f1, f2, f3, f4, f5);
}
/**
* Like {@link #all(Future, Future)} but with 6 futures.
*/
static <T1, T2, T3, T4, T5, T6> CompositeFuture all(Future<T1> f1, Future<T2> f2, Future<T3> f3, Future<T4> f4, Future<T5> f5, Future<T6> f6) {
return CompositeFutureImpl.all(f1, f2, f3, f4, f5, f6);
}
/**
* Like {@link #all(Future, Future)} but with a list of futures.<p>
*
* When the list is empty, the returned future will be already completed.
*/
static CompositeFuture all(List<Future> futures) {
return CompositeFutureImpl.all(futures.toArray(new Future[0]));
}
/**
* Return a composite future, succeeded when any futures is succeeded, failed when all futures are failed.
* <p/>
* The returned future succeeds as soon as one of {@code f1} or {@code f2} succeeds.
*
* @param f1 future
* @param f2 future
* @return the composite future
*/
static <T1, T2> CompositeFuture any(Future<T1> f1, Future<T2> f2) {
return CompositeFutureImpl.any(f1, f2);
}
/**
* Like {@link #any(Future, Future)} but with 3 futures.
*/
static <T1, T2, T3> CompositeFuture any(Future<T1> f1, Future<T2> f2, Future<T3> f3) {
return CompositeFutureImpl.any(f1, f2, f3);
}
/**
* Like {@link #any(Future, Future)} but with 4 futures.
*/
static <T1, T2, T3, T4> CompositeFuture any(Future<T1> f1, Future<T2> f2, Future<T3> f3, Future<T4> f4) {
return CompositeFutureImpl.any(f1, f2, f3, f4);
}
/**
* Like {@link #any(Future, Future)} but with 5 futures.
*/
static <T1, T2, T3, T4, T5> CompositeFuture any(Future<T1> f1, Future<T2> f2, Future<T3> f3, Future<T4> f4, Future<T5> f5) {
return CompositeFutureImpl.any(f1, f2, f3, f4, f5);
}
/**
* Like {@link #any(Future, Future)} but with 6 futures.
*/
static <T1, T2, T3, T4, T5, T6> CompositeFuture any(Future<T1> f1, Future<T2> f2, Future<T3> f3, Future<T4> f4, Future<T5> f5, Future<T6> f6) {
return CompositeFutureImpl.any(f1, f2, f3, f4, f5, f6);
}
/**
* Like {@link #any(Future, Future)} but with a list of futures.<p>
*
* When the list is empty, the returned future will be already completed.
*/
static CompositeFuture any(List<Future> futures) {
return CompositeFutureImpl.any(futures.toArray(new Future[0]));
}
/**
* Return a composite future, succeeded when all futures are succeeded, failed when any future is failed.
* <p/>
* It always wait until all its futures are completed and will not fail as soon as one of {@code f1} or {@code f2} fails.
*
* @param f1 future
* @param f2 future
* @return the composite future
*/
static <T1, T2> CompositeFuture join(Future<T1> f1, Future<T2> f2) {
return CompositeFutureImpl.join(f1, f2);
}
/**
* Like {@link #join(Future, Future)} but with 3 futures.
*/
static <T1, T2, T3> CompositeFuture join(Future<T1> f1, Future<T2> f2, Future<T3> f3) {
return CompositeFutureImpl.join(f1, f2, f3);
}
/**
* Like {@link #join(Future, Future)} but with 4 futures.
*/
static <T1, T2, T3, T4> CompositeFuture join(Future<T1> f1, Future<T2> f2, Future<T3> f3, Future<T4> f4) {
return CompositeFutureImpl.join(f1, f2, f3, f4);
}
/**
* Like {@link #join(Future, Future)} but with 5 futures.
*/
static <T1, T2, T3, T4, T5> CompositeFuture join(Future<T1> f1, Future<T2> f2, Future<T3> f3, Future<T4> f4, Future<T5> f5) {
return CompositeFutureImpl.join(f1, f2, f3, f4, f5);
}
/**
* Like {@link #join(Future, Future)} but with 6 futures.
*/
static <T1, T2, T3, T4, T5, T6> CompositeFuture join(Future<T1> f1, Future<T2> f2, Future<T3> f3, Future<T4> f4, Future<T5> f5, Future<T6> f6) {
return CompositeFutureImpl.join(f1, f2, f3, f4, f5, f6);
}
/**
* Like {@link #join(Future, Future)} but with a list of futures.<p>
*
* When the list is empty, the returned future will be already completed.
*/
static CompositeFuture join(List<Future> futures) {
return CompositeFutureImpl.join(futures.toArray(new Future[0]));
}
@Override
CompositeFuture onComplete(Handler<AsyncResult<CompositeFuture>> handler);
@Override
default CompositeFuture onSuccess(Handler<CompositeFuture> handler) {
Future.super.onSuccess(handler);
return this;
}
@Override
default CompositeFuture onFailure(Handler<Throwable> handler) {
Future.super.onFailure(handler);
return this;
}
/**
* Returns a cause of a wrapped future
*
* @param index the wrapped future index
*/
Throwable cause(int index);
/**
* Returns true if a wrapped future is succeeded
*
* @param index the wrapped future index
*/
boolean succeeded(int index);
/**
* Returns true if a wrapped future is failed
*
* @param index the wrapped future index
*/
boolean failed(int index);
/**
* Returns true if a wrapped future is completed
*
* @param index the wrapped future index
*/
boolean isComplete(int index);
/**
* Returns the result of a wrapped future
*
* @param index the wrapped future index
*/
<T> T resultAt(int index);
/**
* @return the number of wrapped future
*/
int size();
/**
* @return a list of the current completed values. If one future is not yet resolved or is failed, {@code} null
* will be used
*/
default <T> List<T> list() {
int size = size();
ArrayList<T> list = new ArrayList<>(size);
for (int index = 0;index < size;index++) {
list.add(resultAt(index));
} }
return list;
}
/** /**
* @return a list of all the eventual failure causes. If no future failed, returns a list of null values. * Like {@link #all(Future, Future)} but with 3 futures.
*/ */
default List<Throwable> causes() { static <T1, T2, T3> CompositeFuture all(Future<T1> f1, Future<T2> f2, Future<T3> f3) {
int size = size(); return CompositeFutureImpl.all(f1, f2, f3);
ArrayList<Throwable> list = new ArrayList<>(size); }
for (int index = 0; index < size; index++) {
list.add(cause(index)); /**
* Like {@link #all(Future, Future)} but with 4 futures.
*/
static <T1, T2, T3, T4> CompositeFuture all(Future<T1> f1, Future<T2> f2, Future<T3> f3, Future<T4> f4) {
return CompositeFutureImpl.all(f1, f2, f3, f4);
}
/**
* Like {@link #all(Future, Future)} but with 5 futures.
*/
static <T1, T2, T3, T4, T5> CompositeFuture all(Future<T1> f1, Future<T2> f2, Future<T3> f3, Future<T4> f4, Future<T5> f5) {
return CompositeFutureImpl.all(f1, f2, f3, f4, f5);
}
/**
* Like {@link #all(Future, Future)} but with 6 futures.
*/
static <T1, T2, T3, T4, T5, T6> CompositeFuture all(Future<T1> f1, Future<T2> f2, Future<T3> f3, Future<T4> f4, Future<T5> f5, Future<T6> f6) {
return CompositeFutureImpl.all(f1, f2, f3, f4, f5, f6);
}
/**
* Like {@link #all(Future, Future)} but with a list of futures.<p>
*
* When the list is empty, the returned future will be already completed.
*/
static CompositeFuture all(List<Future<?>> futures) {
return CompositeFutureImpl.all(futures.toArray(new Future[0]));
}
/**
* Return a composite future, succeeded when any futures is succeeded, failed when all futures are failed.
* <p/>
* The returned future succeeds as soon as one of {@code f1} or {@code f2} succeeds.
*
* @param f1 future
* @param f2 future
* @return the composite future
*/
static <T1, T2> CompositeFuture any(Future<T1> f1, Future<T2> f2) {
return CompositeFutureImpl.any(f1, f2);
}
/**
* Like {@link #any(Future, Future)} but with 3 futures.
*/
static <T1, T2, T3> CompositeFuture any(Future<T1> f1, Future<T2> f2, Future<T3> f3) {
return CompositeFutureImpl.any(f1, f2, f3);
}
/**
* Like {@link #any(Future, Future)} but with 4 futures.
*/
static <T1, T2, T3, T4> CompositeFuture any(Future<T1> f1, Future<T2> f2, Future<T3> f3, Future<T4> f4) {
return CompositeFutureImpl.any(f1, f2, f3, f4);
}
/**
* Like {@link #any(Future, Future)} but with 5 futures.
*/
static <T1, T2, T3, T4, T5> CompositeFuture any(Future<T1> f1, Future<T2> f2, Future<T3> f3, Future<T4> f4, Future<T5> f5) {
return CompositeFutureImpl.any(f1, f2, f3, f4, f5);
}
/**
* Like {@link #any(Future, Future)} but with 6 futures.
*/
static <T1, T2, T3, T4, T5, T6> CompositeFuture any(Future<T1> f1, Future<T2> f2, Future<T3> f3, Future<T4> f4, Future<T5> f5, Future<T6> f6) {
return CompositeFutureImpl.any(f1, f2, f3, f4, f5, f6);
}
/**
* Like {@link #any(Future, Future)} but with a list of futures.<p>
*
* When the list is empty, the returned future will be already completed.
*/
static CompositeFuture any(List<Future<?>> futures) {
return CompositeFutureImpl.any(futures.toArray(new Future[0]));
}
/**
* Return a composite future, succeeded when all futures are succeeded, failed when any future is failed.
* <p/>
* It always wait until all its futures are completed and will not fail as soon as one of {@code f1} or {@code f2} fails.
*
* @param f1 future
* @param f2 future
* @return the composite future
*/
static <T1, T2> CompositeFuture join(Future<T1> f1, Future<T2> f2) {
return CompositeFutureImpl.join(f1, f2);
}
/**
* Like {@link #join(Future, Future)} but with 3 futures.
*/
static <T1, T2, T3> CompositeFuture join(Future<T1> f1, Future<T2> f2, Future<T3> f3) {
return CompositeFutureImpl.join(f1, f2, f3);
}
/**
* Like {@link #join(Future, Future)} but with 4 futures.
*/
static <T1, T2, T3, T4> CompositeFuture join(Future<T1> f1, Future<T2> f2, Future<T3> f3, Future<T4> f4) {
return CompositeFutureImpl.join(f1, f2, f3, f4);
}
/**
* Like {@link #join(Future, Future)} but with 5 futures.
*/
static <T1, T2, T3, T4, T5> CompositeFuture join(Future<T1> f1, Future<T2> f2, Future<T3> f3, Future<T4> f4, Future<T5> f5) {
return CompositeFutureImpl.join(f1, f2, f3, f4, f5);
}
/**
* Like {@link #join(Future, Future)} but with 6 futures.
*/
static <T1, T2, T3, T4, T5, T6> CompositeFuture join(Future<T1> f1, Future<T2> f2, Future<T3> f3, Future<T4> f4, Future<T5> f5, Future<T6> f6) {
return CompositeFutureImpl.join(f1, f2, f3, f4, f5, f6);
}
/**
* Like {@link #join(Future, Future)} but with a list of futures.<p>
*
* When the list is empty, the returned future will be already completed.
*/
static CompositeFuture join(List<Future<?>> futures) {
return CompositeFutureImpl.join(futures.toArray(new Future[0]));
}
@Override
CompositeFuture onComplete(Handler<AsyncResult<CompositeFuture>> handler);
@Override
default CompositeFuture onSuccess(Handler<CompositeFuture> handler) {
Future.super.onSuccess(handler);
return this;
}
@Override
default CompositeFuture onFailure(Handler<Throwable> handler) {
Future.super.onFailure(handler);
return this;
}
/**
* Returns a cause of a wrapped future
*
* @param index the wrapped future index
*/
Throwable cause(int index);
/**
* Returns true if a wrapped future is succeeded
*
* @param index the wrapped future index
*/
boolean succeeded(int index);
/**
* Returns true if a wrapped future is failed
*
* @param index the wrapped future index
*/
boolean failed(int index);
/**
* Returns true if a wrapped future is completed
*
* @param index the wrapped future index
*/
boolean isComplete(int index);
/**
* Returns the result of a wrapped future
*
* @param index the wrapped future index
*/
<T> T resultAt(int index);
/**
* @return the number of wrapped future
*/
int size();
/**
* @return a list of the current completed values. If one future is not yet resolved or is failed, {@code} null
* will be used
*/
default <T> List<T> list() {
int size = size();
ArrayList<T> list = new ArrayList<>(size);
for (int index = 0;index < size;index++) {
list.add(resultAt(index));
}
return list;
}
/**
* @return a list of all the eventual failure causes. If no future failed, returns a list of null values.
*/
default List<Throwable> causes() {
int size = size();
ArrayList<Throwable> list = new ArrayList<>(size);
for (int index = 0; index < size; index++) {
list.add(cause(index));
}
return list;
} }
return list;
}
} }

View file

@ -13,402 +13,403 @@ import java.util.function.Function;
*/ */
public interface Future<T> extends AsyncResult<T> { public interface Future<T> extends AsyncResult<T> {
/** /**
* Create a future that hasn't completed yet and that is passed to the {@code handler} before it is returned. * Create a future that hasn't completed yet and that is passed to the {@code handler} before it is returned.
* *
* @param handler the handler * @param handler the handler
* @param <T> the result type * @param <T> the result type
* @return the future. * @return the future.
*/ */
static <T> Future<T> future(Handler<Promise<T>> handler) { static <T> Future<T> future(Handler<Promise<T>> handler) {
Promise<T> promise = Promise.promise(); Promise<T> promise = Promise.promise();
try { try {
handler.handle(promise); handler.handle(promise);
} catch (Throwable e){ } catch (Throwable e){
promise.tryFail(e); promise.tryFail(e);
}
return promise.future();
} }
return promise.future();
}
/** /**
* Create a succeeded future with a null result * Create a succeeded future with a null result
* *
* @param <T> the result type * @param <T> the result type
* @return the future * @return the future
*/ */
static <T> Future<T> succeededFuture() { @SuppressWarnings("unchecked")
return (Future<T>) SucceededFuture.EMPTY; static <T> Future<T> succeededFuture() {
} return (Future<T>) SucceededFuture.EMPTY;
/**
* Created a succeeded future with the specified result.
*
* @param result the result
* @param <T> the result type
* @return the future
*/
static <T> Future<T> succeededFuture(T result) {
if (result == null) {
return succeededFuture();
} else {
return new SucceededFuture<>(result);
} }
}
/** /**
* Create a failed future with the specified failure cause. * Created a succeeded future with the specified result.
* *
* @param t the failure cause as a Throwable * @param result the result
* @param <T> the result type * @param <T> the result type
* @return the future * @return the future
*/ */
static <T> Future<T> failedFuture(Throwable t) { static <T> Future<T> succeededFuture(T result) {
return new FailedFuture<>(t); if (result == null) {
} return succeededFuture();
} else {
return new SucceededFuture<>(result);
}
}
/** /**
* Create a failed future with the specified failure message. * Create a failed future with the specified failure cause.
* *
* @param failureMessage the failure message * @param t the failure cause as a Throwable
* @param <T> the result type * @param <T> the result type
* @return the future * @return the future
*/ */
static <T> Future<T> failedFuture(String failureMessage) { static <T> Future<T> failedFuture(Throwable t) {
return new FailedFuture<>(failureMessage); return new FailedFuture<>(t);
} }
/** /**
* Has the future completed? * Create a failed future with the specified failure message.
* <p> *
* It's completed if it's either succeeded or failed. * @param failureMessage the failure message
* * @param <T> the result type
* @return true if completed, false if not * @return the future
*/ */
boolean isComplete(); static <T> Future<T> failedFuture(String failureMessage) {
return new FailedFuture<>(failureMessage);
}
/** /**
* Add a handler to be notified of the result. * Has the future completed?
* <p> * <p>
* <em><strong>WARNING</strong></em>: this is a terminal operation. * It's completed if it's either succeeded or failed.
* If several {@code handler}s are registered, there is no guarantee that they will be invoked in order of registration. *
* * @return true if completed, false if not
* @param handler the handler that will be called with the result */
* @return a reference to this, so it can be used fluently boolean isComplete();
*/
Future<T> onComplete(Handler<AsyncResult<T>> handler);
/** /**
* Add a handler to be notified of the succeeded result. * Add a handler to be notified of the result.
* <p> * <p>
* <em><strong>WARNING</strong></em>: this is a terminal operation. * <em><strong>WARNING</strong></em>: this is a terminal operation.
* If several {@code handler}s are registered, there is no guarantee that they will be invoked in order of registration. * If several {@code handler}s are registered, there is no guarantee that they will be invoked in order of registration.
* *
* @param handler the handler that will be called with the succeeded result * @param handler the handler that will be called with the result
* @return a reference to this, so it can be used fluently * @return a reference to this, so it can be used fluently
*/ */
default Future<T> onSuccess(Handler<T> handler) { Future<T> onComplete(Handler<AsyncResult<T>> handler);
return onComplete(ar -> {
if (ar.succeeded()) {
handler.handle(ar.result());
}
});
}
/** /**
* Add a handler to be notified of the failed result. * Add a handler to be notified of the succeeded result.
* <p> * <p>
* <em><strong>WARNING</strong></em>: this is a terminal operation. * <em><strong>WARNING</strong></em>: this is a terminal operation.
* If several {@code handler}s are registered, there is no guarantee that they will be invoked in order of registration. * If several {@code handler}s are registered, there is no guarantee that they will be invoked in order of registration.
* *
* @param handler the handler that will be called with the failed result * @param handler the handler that will be called with the succeeded result
* @return a reference to this, so it can be used fluently * @return a reference to this, so it can be used fluently
*/ */
default Future<T> onFailure(Handler<Throwable> handler) { default Future<T> onSuccess(Handler<T> handler) {
return onComplete(ar -> { return onComplete(ar -> {
if (ar.failed()) { if (ar.succeeded()) {
handler.handle(ar.cause()); handler.handle(ar.result());
} }
}); });
} }
/** /**
* The result of the operation. This will be null if the operation failed. * Add a handler to be notified of the failed result.
* * <p>
* @return the result or null if the operation failed. * <em><strong>WARNING</strong></em>: this is a terminal operation.
*/ * If several {@code handler}s are registered, there is no guarantee that they will be invoked in order of registration.
@Override *
T result(); * @param handler the handler that will be called with the failed result
* @return a reference to this, so it can be used fluently
*/
default Future<T> onFailure(Handler<Throwable> handler) {
return onComplete(ar -> {
if (ar.failed()) {
handler.handle(ar.cause());
}
});
}
/** /**
* A Throwable describing failure. This will be null if the operation succeeded. * The result of the operation. This will be null if the operation failed.
* *
* @return the cause or null if the operation succeeded. * @return the result or null if the operation failed.
*/ */
@Override @Override
Throwable cause(); T result();
/** /**
* Did it succeed? * A Throwable describing failure. This will be null if the operation succeeded.
* *
* @return true if it succeded or false otherwise * @return the cause or null if the operation succeeded.
*/ */
@Override @Override
boolean succeeded(); Throwable cause();
/** /**
* Did it fail? * Did it succeed?
* *
* @return true if it failed or false otherwise * @return true if it succeded or false otherwise
*/ */
@Override @Override
boolean failed(); boolean succeeded();
/** /**
* Alias for {@link #compose(Function)}. * Did it fail?
*/ *
default <U> Future<U> flatMap(Function<T, Future<U>> mapper) { * @return true if it failed or false otherwise
return compose(mapper); */
} @Override
boolean failed();
/** /**
* Compose this future with a {@code mapper} function.<p> * Alias for {@link #compose(Function)}.
* */
* When this future (the one on which {@code compose} is called) succeeds, the {@code mapper} will be called with default <U> Future<U> flatMap(Function<T, Future<U>> mapper) {
* the completed value and this mapper returns another future object. This returned future completion will complete return compose(mapper);
* the future returned by this method call.<p> }
*
* If the {@code mapper} throws an exception, the returned future will be failed with this exception.<p>
*
* When this future fails, the failure will be propagated to the returned future and the {@code mapper}
* will not be called.
*
* @param mapper the mapper function
* @return the composed future
*/
default <U> Future<U> compose(Function<T, Future<U>> mapper) {
return compose(mapper, Future::failedFuture);
}
/** /**
* Handles a failure of this Future by returning the result of another Future. * Compose this future with a {@code mapper} function.<p>
* If the mapper fails, then the returned future will be failed with this failure. *
* * When this future (the one on which {@code compose} is called) succeeds, the {@code mapper} will be called with
* @param mapper A function which takes the exception of a failure and returns a new future. * the completed value and this mapper returns another future object. This returned future completion will complete
* @return A recovered future * the future returned by this method call.<p>
*/ *
default Future<T> recover(Function<Throwable, Future<T>> mapper) { * If the {@code mapper} throws an exception, the returned future will be failed with this exception.<p>
return compose(Future::succeededFuture, mapper); *
} * When this future fails, the failure will be propagated to the returned future and the {@code mapper}
* will not be called.
*
* @param mapper the mapper function
* @return the composed future
*/
default <U> Future<U> compose(Function<T, Future<U>> mapper) {
return compose(mapper, Future::failedFuture);
}
/** /**
* Compose this future with a {@code successMapper} and {@code failureMapper} functions.<p> * Handles a failure of this Future by returning the result of another Future.
* * If the mapper fails, then the returned future will be failed with this failure.
* When this future (the one on which {@code compose} is called) succeeds, the {@code successMapper} will be called with *
* the completed value and this mapper returns another future object. This returned future completion will complete * @param mapper A function which takes the exception of a failure and returns a new future.
* the future returned by this method call.<p> * @return A recovered future
* */
* When this future (the one on which {@code compose} is called) fails, the {@code failureMapper} will be called with default Future<T> recover(Function<Throwable, Future<T>> mapper) {
* the failure and this mapper returns another future object. This returned future completion will complete return compose(Future::succeededFuture, mapper);
* the future returned by this method call.<p> }
*
* If any mapper function throws an exception, the returned future will be failed with this exception.<p>
*
* @param successMapper the function mapping the success
* @param failureMapper the function mapping the failure
* @return the composed future
*/
<U> Future<U> compose(Function<T, Future<U>> successMapper, Function<Throwable, Future<U>> failureMapper);
/** /**
* Transform this future with a {@code mapper} functions.<p> * Compose this future with a {@code successMapper} and {@code failureMapper} functions.<p>
* *
* When this future (the one on which {@code transform} is called) completes, the {@code mapper} will be called with * When this future (the one on which {@code compose} is called) succeeds, the {@code successMapper} will be called with
* the async result and this mapper returns another future object. This returned future completion will complete * the completed value and this mapper returns another future object. This returned future completion will complete
* the future returned by this method call.<p> * the future returned by this method call.<p>
* *
* If any mapper function throws an exception, the returned future will be failed with this exception.<p> * When this future (the one on which {@code compose} is called) fails, the {@code failureMapper} will be called with
* * the failure and this mapper returns another future object. This returned future completion will complete
* @param mapper the function mapping the future * the future returned by this method call.<p>
* @return the transformed future *
*/ * If any mapper function throws an exception, the returned future will be failed with this exception.<p>
<U> Future<U> transform(Function<AsyncResult<T>, Future<U>> mapper); *
* @param successMapper the function mapping the success
* @param failureMapper the function mapping the failure
* @return the composed future
*/
<U> Future<U> compose(Function<T, Future<U>> successMapper, Function<Throwable, Future<U>> failureMapper);
/** /**
* Compose this future with a {@code mapper} that will be always be called. * Transform this future with a {@code mapper} functions.<p>
* *
* <p>When this future (the one on which {@code eventually} is called) completes, the {@code mapper} will be called * When this future (the one on which {@code transform} is called) completes, the {@code mapper} will be called with
* and this mapper returns another future object. This returned future completion will complete the future returned * the async result and this mapper returns another future object. This returned future completion will complete
* by this method call with the original result of the future. * the future returned by this method call.<p>
* *
* <p>The outcome of the future returned by the {@code mapper} will not influence the nature * If any mapper function throws an exception, the returned future will be failed with this exception.<p>
* of the returned future. *
* * @param mapper the function mapping the future
* @param mapper the function returning the future. * @return the transformed future
* @return the composed future */
*/ <U> Future<U> transform(Function<AsyncResult<T>, Future<U>> mapper);
<U> Future<T> eventually(Function<Void, Future<U>> mapper);
/** /**
* Apply a {@code mapper} function on this future.<p> * Compose this future with a {@code mapper} that will be always be called.
* *
* When this future succeeds, the {@code mapper} will be called with the completed value and this mapper * <p>When this future (the one on which {@code eventually} is called) completes, the {@code mapper} will be called
* returns a value. This value will complete the future returned by this method call.<p> * and this mapper returns another future object. This returned future completion will complete the future returned
* * by this method call with the original result of the future.
* If the {@code mapper} throws an exception, the returned future will be failed with this exception.<p> *
* * <p>The outcome of the future returned by the {@code mapper} will not influence the nature
* When this future fails, the failure will be propagated to the returned future and the {@code mapper} * of the returned future.
* will not be called. *
* * @param mapper the function returning the future.
* @param mapper the mapper function * @return the composed future
* @return the mapped future */
*/ <U> Future<T> eventually(Function<Void, Future<U>> mapper);
<U> Future<U> map(Function<T, U> mapper);
/** /**
* Map the result of a future to a specific {@code value}.<p> * Apply a {@code mapper} function on this future.<p>
* *
* When this future succeeds, this {@code value} will complete the future returned by this method call.<p> * When this future succeeds, the {@code mapper} will be called with the completed value and this mapper
* * returns a value. This value will complete the future returned by this method call.<p>
* When this future fails, the failure will be propagated to the returned future. *
* * If the {@code mapper} throws an exception, the returned future will be failed with this exception.<p>
* @param value the value that eventually completes the mapped future *
* @return the mapped future * When this future fails, the failure will be propagated to the returned future and the {@code mapper}
*/ * will not be called.
<V> Future<V> map(V value); *
* @param mapper the mapper function
* @return the mapped future
*/
<U> Future<U> map(Function<T, U> mapper);
/** /**
* Map the result of a future to {@code null}.<p> * Map the result of a future to a specific {@code value}.<p>
* *
* This is a conveniency for {@code future.map((T) null)} or {@code future.map((Void) null)}.<p> * When this future succeeds, this {@code value} will complete the future returned by this method call.<p>
* *
* When this future succeeds, {@code null} will complete the future returned by this method call.<p> * When this future fails, the failure will be propagated to the returned future.
* *
* When this future fails, the failure will be propagated to the returned future. * @param value the value that eventually completes the mapped future
* * @return the mapped future
* @return the mapped future */
*/ <V> Future<V> map(V value);
@Override
default <V> Future<V> mapEmpty() {
return (Future<V>) AsyncResult.super.mapEmpty();
}
/** /**
* Apply a {@code mapper} function on this future.<p> * Map the result of a future to {@code null}.<p>
* *
* When this future fails, the {@code mapper} will be called with the completed value and this mapper * This is a conveniency for {@code future.map((T) null)} or {@code future.map((Void) null)}.<p>
* returns a value. This value will complete the future returned by this method call.<p> *
* * When this future succeeds, {@code null} will complete the future returned by this method call.<p>
* If the {@code mapper} throws an exception, the returned future will be failed with this exception.<p> *
* * When this future fails, the failure will be propagated to the returned future.
* When this future succeeds, the result will be propagated to the returned future and the {@code mapper} *
* will not be called. * @return the mapped future
* */
* @param mapper the mapper function @Override
* @return the mapped future default <V> Future<V> mapEmpty() {
*/ return (Future<V>) AsyncResult.super.mapEmpty();
Future<T> otherwise(Function<Throwable, T> mapper); }
/** /**
* Map the failure of a future to a specific {@code value}.<p> * Apply a {@code mapper} function on this future.<p>
* *
* When this future fails, this {@code value} will complete the future returned by this method call.<p> * When this future fails, the {@code mapper} will be called with the completed value and this mapper
* * returns a value. This value will complete the future returned by this method call.<p>
* When this future succeeds, the result will be propagated to the returned future. *
* * If the {@code mapper} throws an exception, the returned future will be failed with this exception.<p>
* @param value the value that eventually completes the mapped future *
* @return the mapped future * When this future succeeds, the result will be propagated to the returned future and the {@code mapper}
*/ * will not be called.
Future<T> otherwise(T value); *
* @param mapper the mapper function
* @return the mapped future
*/
Future<T> otherwise(Function<Throwable, T> mapper);
/** /**
* Map the failure of a future to {@code null}.<p> * Map the failure of a future to a specific {@code value}.<p>
* *
* This is a convenience for {@code future.otherwise((T) null)}.<p> * When this future fails, this {@code value} will complete the future returned by this method call.<p>
* *
* When this future fails, the {@code null} value will complete the future returned by this method call.<p> * When this future succeeds, the result will be propagated to the returned future.
* *
* When this future succeeds, the result will be propagated to the returned future. * @param value the value that eventually completes the mapped future
* * @return the mapped future
* @return the mapped future */
*/ Future<T> otherwise(T value);
default Future<T> otherwiseEmpty() {
return (Future<T>) AsyncResult.super.otherwiseEmpty();
}
/** /**
* Invokes the given {@code handler} upon completion. * Map the failure of a future to {@code null}.<p>
* <p> *
* If the {@code handler} throws an exception, the returned future will be failed with this exception. * This is a convenience for {@code future.otherwise((T) null)}.<p>
* *
* @param handler invoked upon completion of this future * When this future fails, the {@code null} value will complete the future returned by this method call.<p>
* @return a future completed after the {@code handler} has been invoked *
*/ * When this future succeeds, the result will be propagated to the returned future.
default Future<T> andThen(Handler<AsyncResult<T>> handler) { *
return transform(ar -> { * @return the mapped future
handler.handle(ar); */
return (Future<T>) ar; default Future<T> otherwiseEmpty() {
}); return (Future<T>) AsyncResult.super.otherwiseEmpty();
} }
/** /**
* Bridges this Vert.x future to a {@link CompletionStage} instance. * Invokes the given {@code handler} upon completion.
* <p> * <p>
* The {@link CompletionStage} handling methods will be called from the thread that resolves this future. * If the {@code handler} throws an exception, the returned future will be failed with this exception.
* *
* @return a {@link CompletionStage} that completes when this future resolves * @param handler invoked upon completion of this future
*/ * @return a future completed after the {@code handler} has been invoked
default CompletionStage<T> toCompletionStage() { */
CompletableFuture<T> completableFuture = new CompletableFuture<>(); default Future<T> andThen(Handler<AsyncResult<T>> handler) {
onComplete(ar -> { return transform(ar -> {
if (ar.succeeded()) { handler.handle(ar);
completableFuture.complete(ar.result()); return (Future<T>) ar;
} else { });
completableFuture.completeExceptionally(ar.cause()); }
}
});
return completableFuture;
}
/** /**
* Bridges a {@link CompletionStage} object to a Vert.x future instance. * Bridges this Vert.x future to a {@link CompletionStage} instance.
* <p> * <p>
* The Vert.x future handling methods will be called from the thread that completes {@code completionStage}. * The {@link CompletionStage} handling methods will be called from the thread that resolves this future.
* *
* @param completionStage a completion stage * @return a {@link CompletionStage} that completes when this future resolves
* @param <T> the result type */
* @return a Vert.x future that resolves when {@code completionStage} resolves default CompletionStage<T> toCompletionStage() {
*/ CompletableFuture<T> completableFuture = new CompletableFuture<>();
static <T> Future<T> fromCompletionStage(CompletionStage<T> completionStage) { onComplete(ar -> {
Promise<T> promise = Promise.promise(); if (ar.succeeded()) {
completionStage.whenComplete((value, err) -> { completableFuture.complete(ar.result());
if (err != null) { } else {
promise.fail(err); completableFuture.completeExceptionally(ar.cause());
} else { }
promise.complete(value); });
} return completableFuture;
}); }
return promise.future();
}
/** /**
* Bridges a {@link CompletionStage} object to a Vert.x future instance. * Bridges a {@link CompletionStage} object to a Vert.x future instance.
* <p> * <p>
* The Vert.x future handling methods will be called on the provided {@code context}. * The Vert.x future handling methods will be called from the thread that completes {@code completionStage}.
* *
* @param completionStage a completion stage * @param completionStage a completion stage
* @param context a Vert.x context to dispatch to * @param <T> the result type
* @param <T> the result type * @return a Vert.x future that resolves when {@code completionStage} resolves
* @return a Vert.x future that resolves when {@code completionStage} resolves */
*/ static <T> Future<T> fromCompletionStage(CompletionStage<T> completionStage) {
static <T> Future<T> fromCompletionStage(CompletionStage<T> completionStage, Context context) { Promise<T> promise = Promise.promise();
Promise<T> promise = ((ContextInternal) context).promise(); completionStage.whenComplete((value, err) -> {
completionStage.whenComplete((value, err) -> { if (err != null) {
if (err != null) { promise.fail(err);
promise.fail(err); } else {
} else { promise.complete(value);
promise.complete(value); }
} });
}); return promise.future();
return promise.future(); }
}
/**
* Bridges a {@link CompletionStage} object to a Vert.x future instance.
* <p>
* The Vert.x future handling methods will be called on the provided {@code context}.
*
* @param completionStage a completion stage
* @param context a Vert.x context to dispatch to
* @param <T> the result type
* @return a Vert.x future that resolves when {@code completionStage} resolves
*/
static <T> Future<T> fromCompletionStage(CompletionStage<T> completionStage, Context context) {
Promise<T> promise = ((ContextInternal) context).promise();
completionStage.whenComplete((value, err) -> {
if (err != null) {
promise.fail(err);
} else {
promise.complete(value);
}
});
return promise.future();
}
} }

View file

@ -9,181 +9,178 @@ import java.util.function.Function;
public class CompositeFutureImpl extends FutureImpl<CompositeFuture> implements CompositeFuture { public class CompositeFutureImpl extends FutureImpl<CompositeFuture> implements CompositeFuture {
public static CompositeFuture all(Future<?>... results) { public static CompositeFuture all(Future<?>... results) {
CompositeFutureImpl composite = new CompositeFutureImpl(results); CompositeFutureImpl composite = new CompositeFutureImpl(results);
int len = results.length; int len = results.length;
for (Future<?> result : results) { for (Future<?> result : results) {
result.onComplete(ar -> { result.onComplete(ar -> {
if (ar.succeeded()) { if (ar.succeeded()) {
synchronized (composite) { synchronized (composite) {
if (composite.count == len || ++composite.count != len) { if (composite.count == len || ++composite.count != len) {
return; return;
} }
} }
composite.trySucceed(); composite.trySucceed();
} else { } else {
synchronized (composite) { synchronized (composite) {
if (composite.count == len) { if (composite.count == len) {
return; return;
} }
composite.count = len; composite.count = len;
} }
composite.tryFail(ar.cause()); composite.tryFail(ar.cause());
}
});
} }
}); if (len == 0) {
} composite.trySucceed();
if (len == 0) {
composite.trySucceed();
}
return composite;
}
public static CompositeFuture any(Future<?>... results) {
CompositeFutureImpl composite = new CompositeFutureImpl(results);
int len = results.length;
for (Future<?> result : results) {
result.onComplete(ar -> {
if (ar.succeeded()) {
synchronized (composite) {
if (composite.count == len) {
return;
}
composite.count = len;
}
composite.trySucceed();
} else {
synchronized (composite) {
if (composite.count == len || ++composite.count != len) {
return;
}
}
composite.tryFail(ar.cause());
} }
}); return composite;
} }
if (results.length == 0) {
composite.trySucceed();
}
return composite;
}
private static final Function<CompositeFuture, Object> ALL = cf -> { public static CompositeFuture any(Future<?>... results) {
int size = cf.size(); CompositeFutureImpl composite = new CompositeFutureImpl(results);
for (int i = 0;i < size;i++) { int len = results.length;
if (!cf.succeeded(i)) { for (Future<?> result : results) {
return cf.cause(i); result.onComplete(ar -> {
} if (ar.succeeded()) {
} synchronized (composite) {
return cf; if (composite.count == len) {
}; return;
}
public static CompositeFuture join(Future<?>... results) { composite.count = len;
return join(ALL, results); }
} composite.trySucceed();
} else {
private static CompositeFuture join(Function<CompositeFuture, Object> pred, Future<?>... results) { synchronized (composite) {
CompositeFutureImpl composite = new CompositeFutureImpl(results); if (composite.count == len || ++composite.count != len) {
int len = results.length; return;
for (Future<?> result : results) { }
result.onComplete(ar -> { }
synchronized (composite) { composite.tryFail(ar.cause());
if (++composite.count < len) { }
return; });
}
} }
composite.complete(pred.apply(composite)); if (results.length == 0) {
}); composite.trySucceed();
}
return composite;
} }
if (len == 0) {
composite.trySucceed(); private static final Function<CompositeFuture, Object> ALL = cf -> {
int size = cf.size();
for (int i = 0;i < size;i++) {
if (!cf.succeeded(i)) {
return cf.cause(i);
}
}
return cf;
};
public static CompositeFuture join(Future<?>... results) {
return join(ALL, results);
} }
return composite;
}
private final Future[] results; private static CompositeFuture join(Function<CompositeFuture, Object> pred, Future<?>... results) {
private int count; CompositeFutureImpl composite = new CompositeFutureImpl(results);
int len = results.length;
private CompositeFutureImpl(Future<?>... results) { for (Future<?> result : results) {
this.results = results; result.onComplete(ar -> {
} synchronized (composite) {
if (++composite.count < len) {
@Override return;
public Throwable cause(int index) { }
return future(index).cause(); }
} composite.complete(pred.apply(composite));
});
@Override }
public boolean succeeded(int index) { if (len == 0) {
return future(index).succeeded(); composite.trySucceed();
} }
return composite;
@Override
public boolean failed(int index) {
return future(index).failed();
}
@Override
public boolean isComplete(int index) {
return future(index).isComplete();
}
@Override
public <T> T resultAt(int index) {
return this.<T>future(index).result();
}
private <T> Future<T> future(int index) {
if (index < 0 || index >= results.length) {
throw new IndexOutOfBoundsException();
} }
return (Future<T>) results[index];
}
@Override private final Future<?>[] results;
public int size() { private int count;
return results.length;
}
private void trySucceed() { private CompositeFutureImpl(Future<?>... results) {
tryComplete(this); this.results = results;
}
private void fail(Throwable t) {
complete(t);
}
private void complete(Object result) {
if (result == this) {
tryComplete(this);
} else if (result instanceof Throwable) {
tryFail((Throwable) result);
} }
}
@Override @Override
public CompositeFuture onComplete(Handler<AsyncResult<CompositeFuture>> handler) { public Throwable cause(int index) {
return (CompositeFuture)super.onComplete(handler); return future(index).cause();
} }
@Override @Override
public CompositeFuture onSuccess(Handler<CompositeFuture> handler) { public boolean succeeded(int index) {
return (CompositeFuture)super.onSuccess(handler); return future(index).succeeded();
} }
@Override @Override
public CompositeFuture onFailure(Handler<Throwable> handler) { public boolean failed(int index) {
return (CompositeFuture)super.onFailure(handler); return future(index).failed();
} }
@Override @Override
protected void formatValue(Object value, StringBuilder sb) { public boolean isComplete(int index) {
sb.append('('); return future(index).isComplete();
for (int i = 0;i < results.length;i++) { }
if (i > 0) {
sb.append(','); @Override
} public <T> T resultAt(int index) {
sb.append(results[i]); return this.<T>future(index).result();
}
@SuppressWarnings("unchecked")
private <T> Future<T> future(int index) {
if (index < 0 || index >= results.length) {
throw new IndexOutOfBoundsException();
}
return (Future<T>) results[index];
}
@Override
public int size() {
return results.length;
}
private void trySucceed() {
tryComplete(this);
}
private void complete(Object result) {
if (result == this) {
tryComplete(this);
} else if (result instanceof Throwable) {
tryFail((Throwable) result);
}
}
@Override
public CompositeFuture onComplete(Handler<AsyncResult<CompositeFuture>> handler) {
return (CompositeFuture)super.onComplete(handler);
}
@Override
public CompositeFuture onSuccess(Handler<CompositeFuture> handler) {
return (CompositeFuture)super.onSuccess(handler);
}
@Override
public CompositeFuture onFailure(Handler<Throwable> handler) {
return (CompositeFuture)super.onFailure(handler);
}
@Override
protected void formatValue(Object value, StringBuilder sb) {
sb.append('(');
for (int i = 0;i < results.length;i++) {
if (i > 0) {
sb.append(',');
}
sb.append(results[i]);
}
sb.append(')');
} }
sb.append(')');
}
} }

View file

@ -13,115 +13,118 @@ import java.util.function.Function;
*/ */
public final class FailedFuture<T> extends FutureBase<T> { public final class FailedFuture<T> extends FutureBase<T> {
private final Throwable cause; private final Throwable cause;
/** /**
* Create a future that has already failed * Create a future that has already failed
* @param t the throwable * @param t the throwable
*/ */
public FailedFuture(Throwable t) { public FailedFuture(Throwable t) {
this(null, t); this(null, t);
}
/**
* Create a future that has already failed
* @param t the throwable
*/
public FailedFuture(ContextInternal context, Throwable t) {
super(context);
this.cause = t != null ? t : new NoStackTraceThrowable(null);
}
/**
* Create a future that has already failed
* @param failureMessage the failure message
*/
public FailedFuture(String failureMessage) {
this(null, failureMessage);
}
/**
* Create a future that has already failed
* @param failureMessage the failure message
*/
public FailedFuture(ContextInternal context, String failureMessage) {
this(context, new NoStackTraceThrowable(failureMessage));
}
@Override
public boolean isComplete() {
return true;
}
@Override
public Future<T> onComplete(Handler<AsyncResult<T>> handler) {
if (handler instanceof Listener) {
emitFailure(cause, (Listener<T>) handler);
} else if (context != null) {
context.emit(this, handler);
} else {
handler.handle(this);
} }
return this;
}
@Override /**
public Future<T> onSuccess(Handler<T> handler) { * Create a future that has already failed
return this; * @param t the throwable
} */
public FailedFuture(ContextInternal context, Throwable t) {
@Override super(context);
public Future<T> onFailure(Handler<Throwable> handler) { this.cause = t != null ? t : new NoStackTraceThrowable(null);
if (context != null) {
context.emit(cause, handler);
} else {
handler.handle(cause);
} }
return this;
}
@Override /**
public void addListener(Listener<T> listener) { * Create a future that has already failed
emitFailure(cause, listener); * @param failureMessage the failure message
} */
public FailedFuture(String failureMessage) {
this(null, failureMessage);
}
@Override /**
public T result() { * Create a future that has already failed
return null; * @param failureMessage the failure message
} */
public FailedFuture(ContextInternal context, String failureMessage) {
this(context, new NoStackTraceThrowable(failureMessage));
}
@Override @Override
public Throwable cause() { public boolean isComplete() {
return cause; return true;
} }
@Override @SuppressWarnings("unchecked")
public boolean succeeded() { @Override
return false; public Future<T> onComplete(Handler<AsyncResult<T>> handler) {
} if (handler instanceof Listener) {
emitFailure(cause, (Listener<T>) handler);
} else if (context != null) {
context.emit(this, handler);
} else {
handler.handle(this);
}
return this;
}
@Override @Override
public boolean failed() { public Future<T> onSuccess(Handler<T> handler) {
return true; return this;
} }
@Override @Override
public <U> Future<U> map(Function<T, U> mapper) { public Future<T> onFailure(Handler<Throwable> handler) {
return (Future<U>) this; if (context != null) {
} context.emit(cause, handler);
} else {
handler.handle(cause);
}
return this;
}
@Override @Override
public <V> Future<V> map(V value) { public void addListener(Listener<T> listener) {
return (Future<V>) this; emitFailure(cause, listener);
} }
@Override @Override
public Future<T> otherwise(T value) { public T result() {
return new SucceededFuture<>(context, value); return null;
} }
@Override @Override
public String toString() { public Throwable cause() {
return "Future{cause=" + cause.getMessage() + "}"; return cause;
} }
@Override
public boolean succeeded() {
return false;
}
@Override
public boolean failed() {
return true;
}
@SuppressWarnings("unchecked")
@Override
public <U> Future<U> map(Function<T, U> mapper) {
return (Future<U>) this;
}
@SuppressWarnings("unchecked")
@Override
public <V> Future<V> map(V value) {
return (Future<V>) this;
}
@Override
public Future<T> otherwise(T value) {
return new SucceededFuture<>(context, value);
}
@Override
public String toString() {
return "Future{cause=" + cause.getMessage() + "}";
}
} }

View file

@ -14,255 +14,259 @@ import java.util.Objects;
*/ */
class FutureImpl<T> extends FutureBase<T> { class FutureImpl<T> extends FutureBase<T> {
private static final Object NULL_VALUE = new Object(); private static final Object NULL_VALUE = new Object();
private Object value; private Object value;
private Listener<T> listener; private Listener<T> listener;
/** /**
* Create a future that hasn't completed yet * Create a future that hasn't completed yet
*/ */
FutureImpl() { FutureImpl() {
super(); super();
} }
/** /**
* Create a future that hasn't completed yet * Create a future that hasn't completed yet
*/ */
FutureImpl(ContextInternal context) { FutureImpl(ContextInternal context) {
super(context); super(context);
} }
/** /**
* The result of the operation. This will be null if the operation failed. * The result of the operation. This will be null if the operation failed.
*/ */
public synchronized T result() { @SuppressWarnings("unchecked")
return value instanceof CauseHolder ? null : value == NULL_VALUE ? null : (T) value; public synchronized T result() {
} return value instanceof CauseHolder ? null : value == NULL_VALUE ? null : (T) value;
}
/** /**
* An exception describing failure. This will be null if the operation succeeded. * An exception describing failure. This will be null if the operation succeeded.
*/ */
public synchronized Throwable cause() { public synchronized Throwable cause() {
return value instanceof CauseHolder ? ((CauseHolder)value).cause : null; return value instanceof CauseHolder ? ((CauseHolder)value).cause : null;
} }
/** /**
* Did it succeed? * Did it succeed?
*/ */
public synchronized boolean succeeded() { public synchronized boolean succeeded() {
return value != null && !(value instanceof CauseHolder); return value != null && !(value instanceof CauseHolder);
} }
/** /**
* Did it fail? * Did it fail?
*/ */
public synchronized boolean failed() { public synchronized boolean failed() {
return value instanceof CauseHolder; return value instanceof CauseHolder;
} }
/** /**
* Has it completed? * Has it completed?
*/ */
public synchronized boolean isComplete() { public synchronized boolean isComplete() {
return value != null; return value != null;
} }
@Override @Override
public Future<T> onSuccess(Handler<T> handler) { public Future<T> onSuccess(Handler<T> handler) {
Objects.requireNonNull(handler, "No null handler accepted"); Objects.requireNonNull(handler, "No null handler accepted");
addListener(new Listener<T>() { addListener(new Listener<T>() {
@Override @Override
public void onSuccess(T value) { public void onSuccess(T value) {
try { try {
handler.handle(value); handler.handle(value);
} catch (Throwable t) { } catch (Throwable t) {
if (context != null) { if (context != null) {
context.reportException(t); context.reportException(t);
} else { } else {
throw t; throw t;
} }
}
}
@Override
public void onFailure(Throwable failure) {
}
});
return this;
}
@Override
public Future<T> onFailure(Handler<Throwable> handler) {
Objects.requireNonNull(handler, "No null handler accepted");
addListener(new Listener<T>() {
@Override
public void onSuccess(T value) {
}
@Override
public void onFailure(Throwable failure) {
try {
handler.handle(failure);
} catch (Throwable t) {
if (context != null) {
context.reportException(t);
} else {
throw t;
}
}
}
});
return this;
}
@SuppressWarnings("unchecked")
@Override
public Future<T> onComplete(Handler<AsyncResult<T>> handler) {
Objects.requireNonNull(handler, "No null handler accepted");
Listener<T> listener;
if (handler instanceof Listener) {
listener = (Listener<T>) handler;
} else {
listener = new Listener<T>() {
@Override
public void onSuccess(T value) {
try {
handler.handle(FutureImpl.this);
} catch (Throwable t) {
if (context != null) {
context.reportException(t);
} else {
throw t;
}
}
}
@Override
public void onFailure(Throwable failure) {
try {
handler.handle(FutureImpl.this);
} catch (Throwable t) {
if (context != null) {
context.reportException(t);
} else {
throw t;
}
}
}
};
} }
} addListener(listener);
@Override return this;
public void onFailure(Throwable failure) { }
}
});
return this;
}
@Override @SuppressWarnings("unchecked")
public Future<T> onFailure(Handler<Throwable> handler) { @Override
Objects.requireNonNull(handler, "No null handler accepted"); public void addListener(Listener<T> listener) {
addListener(new Listener<T>() { Object v;
@Override synchronized (this) {
public void onSuccess(T value) { v = value;
} if (v == null) {
@Override if (this.listener == null) {
public void onFailure(Throwable failure) { this.listener = listener;
try { } else {
handler.handle(failure); ListenerArray<T> listeners;
} catch (Throwable t) { if (this.listener instanceof FutureImpl.ListenerArray) {
if (context != null) { listeners = (ListenerArray<T>) this.listener;
context.reportException(t); } else {
} else { listeners = new ListenerArray<>();
throw t; listeners.add(this.listener);
} this.listener = listeners;
}
listeners.add(listener);
}
return;
}
} }
} if (v instanceof CauseHolder) {
}); emitFailure(((CauseHolder)v).cause, listener);
return this; } else {
} if (v == NULL_VALUE) {
v = null;
}
emitSuccess((T) v, listener);
}
}
@Override public boolean tryComplete(T result) {
public Future<T> onComplete(Handler<AsyncResult<T>> handler) { Listener<T> l;
Objects.requireNonNull(handler, "No null handler accepted"); synchronized (this) {
Listener<T> listener; if (value != null) {
if (handler instanceof Listener) { return false;
listener = (Listener<T>) handler; }
} else { value = result == null ? NULL_VALUE : result;
listener = new Listener<T>() { l = listener;
listener = null;
}
if (l != null) {
emitSuccess(result, l);
}
return true;
}
public boolean tryFail(Throwable cause) {
if (cause == null) {
cause = new NoStackTraceThrowable(null);
}
Listener<T> l;
synchronized (this) {
if (value != null) {
return false;
}
value = new CauseHolder(cause);
l = listener;
listener = null;
}
if (l != null) {
emitFailure(cause, l);
}
return true;
}
@Override
public String toString() {
synchronized (this) {
if (value instanceof CauseHolder) {
return "Future{cause=" + ((CauseHolder)value).cause.getMessage() + "}";
}
if (value != null) {
if (value == NULL_VALUE) {
return "Future{result=null}";
}
StringBuilder sb = new StringBuilder("Future{result=");
formatValue(value, sb);
sb.append("}");
return sb.toString();
}
return "Future{unresolved}";
}
}
protected void formatValue(Object value, StringBuilder sb) {
sb.append(value);
}
@SuppressWarnings("serial")
private static class ListenerArray<T> extends ArrayList<Listener<T>> implements Listener<T> {
@Override @Override
public void onSuccess(T value) { public void onSuccess(T value) {
try { for (Listener<T> handler : this) {
handler.handle(FutureImpl.this); handler.onSuccess(value);
} catch (Throwable t) {
if (context != null) {
context.reportException(t);
} else {
throw t;
} }
}
} }
@Override @Override
public void onFailure(Throwable failure) { public void onFailure(Throwable failure) {
try { for (Listener<T> handler : this) {
handler.handle(FutureImpl.this); handler.onFailure(failure);
} catch (Throwable t) {
if (context != null) {
context.reportException(t);
} else {
throw t;
} }
}
} }
};
} }
addListener(listener);
return this;
}
@Override private static class CauseHolder {
public void addListener(Listener<T> listener) {
Object v; private final Throwable cause;
synchronized (this) {
v = value; CauseHolder(Throwable cause) {
if (v == null) { this.cause = cause;
if (this.listener == null) {
this.listener = listener;
} else {
ListenerArray<T> listeners;
if (this.listener instanceof FutureImpl.ListenerArray) {
listeners = (ListenerArray<T>) this.listener;
} else {
listeners = new ListenerArray<>();
listeners.add(this.listener);
this.listener = listeners;
}
listeners.add(listener);
} }
return;
}
} }
if (v instanceof CauseHolder) {
emitFailure(((CauseHolder)v).cause, listener);
} else {
if (v == NULL_VALUE) {
v = null;
}
emitSuccess((T) v, listener);
}
}
public boolean tryComplete(T result) {
Listener<T> l;
synchronized (this) {
if (value != null) {
return false;
}
value = result == null ? NULL_VALUE : result;
l = listener;
listener = null;
}
if (l != null) {
emitSuccess(result, l);
}
return true;
}
public boolean tryFail(Throwable cause) {
if (cause == null) {
cause = new NoStackTraceThrowable(null);
}
Listener<T> l;
synchronized (this) {
if (value != null) {
return false;
}
value = new CauseHolder(cause);
l = listener;
listener = null;
}
if (l != null) {
emitFailure(cause, l);
}
return true;
}
@Override
public String toString() {
synchronized (this) {
if (value instanceof CauseHolder) {
return "Future{cause=" + ((CauseHolder)value).cause.getMessage() + "}";
}
if (value != null) {
if (value == NULL_VALUE) {
return "Future{result=null}";
}
StringBuilder sb = new StringBuilder("Future{result=");
formatValue(value, sb);
sb.append("}");
return sb.toString();
}
return "Future{unresolved}";
}
}
protected void formatValue(Object value, StringBuilder sb) {
sb.append(value);
}
private static class ListenerArray<T> extends ArrayList<Listener<T>> implements Listener<T> {
@Override
public void onSuccess(T value) {
for (Listener<T> handler : this) {
handler.onSuccess(value);
}
}
@Override
public void onFailure(Throwable failure) {
for (Listener<T> handler : this) {
handler.onFailure(failure);
}
}
}
private static class CauseHolder {
private final Throwable cause;
CauseHolder(Throwable cause) {
this.cause = cause;
}
}
} }

View file

@ -13,106 +13,106 @@ import java.util.function.Function;
*/ */
public final class SucceededFuture<T> extends FutureBase<T> { public final class SucceededFuture<T> extends FutureBase<T> {
/** /**
* Stateless instance of empty results that can be shared safely. * Stateless instance of empty results that can be shared safely.
*/ */
public static final SucceededFuture EMPTY = new SucceededFuture(null, null); public static final SucceededFuture<?> EMPTY = new SucceededFuture<>(null, null);
private final T result; private final T result;
/** /**
* Create a future that has already succeeded * Create a future that has already succeeded
* @param result the result * @param result the result
*/ */
public SucceededFuture(T result) { public SucceededFuture(T result) {
this(null, result); this(null, result);
}
/**
* Create a future that has already succeeded
* @param context the context
* @param result the result
*/
public SucceededFuture(ContextInternal context, T result) {
super(context);
this.result = result;
}
@Override
public boolean isComplete() {
return true;
}
@Override
public Future<T> onSuccess(Handler<T> handler) {
if (context != null) {
context.emit(result, handler);
} else {
handler.handle(result);
} }
return this;
}
@Override /**
public Future<T> onFailure(Handler<Throwable> handler) { * Create a future that has already succeeded
return this; * @param context the context
} * @param result the result
*/
@Override public SucceededFuture(ContextInternal context, T result) {
public Future<T> onComplete(Handler<AsyncResult<T>> handler) { super(context);
if (handler instanceof Listener) { this.result = result;
emitSuccess(result ,(Listener<T>) handler);
} else if (context != null) {
context.emit(this, handler);
} else {
handler.handle(this);
} }
return this;
}
@Override @Override
public void addListener(Listener<T> listener) { public boolean isComplete() {
emitSuccess(result ,listener); return true;
} }
@Override @Override
public T result() { public Future<T> onSuccess(Handler<T> handler) {
return result; if (context != null) {
} context.emit(result, handler);
} else {
handler.handle(result);
}
return this;
}
@Override @Override
public Throwable cause() { public Future<T> onFailure(Handler<Throwable> handler) {
return null; return this;
} }
@Override @Override
public boolean succeeded() { public Future<T> onComplete(Handler<AsyncResult<T>> handler) {
return true; if (handler instanceof Listener) {
} emitSuccess(result ,(Listener<T>) handler);
} else if (context != null) {
context.emit(this, handler);
} else {
handler.handle(this);
}
return this;
}
@Override @Override
public boolean failed() { public void addListener(Listener<T> listener) {
return false; emitSuccess(result ,listener);
} }
@Override @Override
public <V> Future<V> map(V value) { public T result() {
return new SucceededFuture<>(context, value); return result;
} }
@Override @Override
public Future<T> otherwise(Function<Throwable, T> mapper) { public Throwable cause() {
Objects.requireNonNull(mapper, "No null mapper accepted"); return null;
return this; }
}
@Override @Override
public Future<T> otherwise(T value) { public boolean succeeded() {
return this; return true;
} }
@Override @Override
public String toString() { public boolean failed() {
return "Future{result=" + result + "}"; return false;
} }
@Override
public <V> Future<V> map(V value) {
return new SucceededFuture<>(context, value);
}
@Override
public Future<T> otherwise(Function<Throwable, T> mapper) {
Objects.requireNonNull(mapper, "No null mapper accepted");
return this;
}
@Override
public Future<T> otherwise(T value) {
return this;
}
@Override
public String toString() {
return "Future{result=" + result + "}";
}
} }

View file

@ -85,7 +85,6 @@ public abstract class Dispatcher {
Objects.requireNonNull(subscribers); Objects.requireNonNull(subscribers);
Queue<Event> queueForThread = queue.get(); Queue<Event> queueForThread = queue.get();
queueForThread.offer(new Event(event, subscribers)); queueForThread.offer(new Event(event, subscribers));
if (!dispatching.get()) { if (!dispatching.get()) {
dispatching.set(true); dispatching.set(true);
try { try {

View file

@ -1,11 +1,9 @@
package org.xbib.event.bus; package org.xbib.event.bus;
import java.lang.reflect.Method;
import java.util.Iterator; import java.util.Iterator;
import java.util.Locale; import java.util.Locale;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.logging.Level; import java.util.logging.Level;
import java.util.logging.Logger; import java.util.logging.Logger;
@ -134,6 +132,10 @@ public class EventBus {
return identifier; return identifier;
} }
public SubscriberRegistry getSubscribers() {
return subscribers;
}
/** Returns the default executor this event bus uses for dispatching events to subscribers. */ /** Returns the default executor this event bus uses for dispatching events to subscribers. */
final Executor executor() { final Executor executor() {
return executor; return executor;
@ -183,7 +185,7 @@ public class EventBus {
* @param event event to post. * @param event event to post.
*/ */
public void post(Object event) { public void post(Object event) {
Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event); Iterator<Subscriber> eventSubscribers = subscribers.getIterator(event);
if (eventSubscribers.hasNext()) { if (eventSubscribers.hasNext()) {
dispatcher.dispatch(event, eventSubscribers); dispatcher.dispatch(event, eventSubscribers);
} else if (!(event instanceof DeadEvent)) { } else if (!(event instanceof DeadEvent)) {
@ -197,33 +199,4 @@ public class EventBus {
return "EventBus{" + identifier + "}"; return "EventBus{" + identifier + "}";
} }
/** Simple logging handler for subscriber exceptions. */
static final class LoggingHandler implements SubscriberExceptionHandler {
static final LoggingHandler INSTANCE = new LoggingHandler();
@Override
public void handleException(Throwable exception, SubscriberExceptionContext context) {
Logger logger = logger(context);
if (logger.isLoggable(Level.SEVERE)) {
logger.log(Level.SEVERE, message(context), exception);
}
}
private static Logger logger(SubscriberExceptionContext context) {
return Logger.getLogger(EventBus.class.getName() + "." + context.getEventBus().identifier());
}
private static String message(SubscriberExceptionContext context) {
Method method = context.getSubscriberMethod();
return "Exception thrown by subscriber method "
+ method.getName()
+ '('
+ method.getParameterTypes()[0].getName()
+ ')'
+ " on subscriber "
+ context.getSubscriber()
+ " when dispatching event: "
+ context.getEvent();
}
}
} }

View file

@ -0,0 +1,37 @@
package org.xbib.event.bus;
import java.lang.reflect.Method;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* Simple logging handler for subscriber exceptions.
*/
final class LoggingHandler implements SubscriberExceptionHandler {
static final LoggingHandler INSTANCE = new LoggingHandler();
@Override
public void handleException(Throwable exception, SubscriberExceptionContext context) {
Logger logger = logger(context);
if (logger.isLoggable(Level.SEVERE)) {
logger.log(Level.SEVERE, message(context), exception);
}
}
private static Logger logger(SubscriberExceptionContext context) {
return Logger.getLogger(EventBus.class.getName() + "." + context.getEventBus().identifier());
}
private static String message(SubscriberExceptionContext context) {
Method method = context.getSubscriberMethod();
return "Exception thrown by subscriber method "
+ method.getName()
+ '('
+ method.getParameterTypes()[0].getName()
+ ')'
+ " on subscriber "
+ context.getSubscriber()
+ " when dispatching event: "
+ context.getEvent();
}
}

View file

@ -23,7 +23,7 @@ import org.xbib.event.bus.util.TypeToken;
/** /**
* Registry of subscribers to a single event bus. * Registry of subscribers to a single event bus.
*/ */
final class SubscriberRegistry { public final class SubscriberRegistry {
/** /**
* All registered subscribers, indexed by event type. * All registered subscribers, indexed by event type.
@ -43,7 +43,6 @@ final class SubscriberRegistry {
/** Registers all subscriber methods on the given listener object. */ /** Registers all subscriber methods on the given listener object. */
void register(Object listener) { void register(Object listener) {
MultiMap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener); MultiMap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) { for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
Class<?> eventType = entry.getKey(); Class<?> eventType = entry.getKey();
@ -53,7 +52,6 @@ final class SubscriberRegistry {
CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>(); CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
eventSubscribers = firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet); eventSubscribers = firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
} }
eventSubscribers.addAll(eventMethodsInListener); eventSubscribers.addAll(eventMethodsInListener);
} }
} }
@ -73,13 +71,12 @@ final class SubscriberRegistry {
throw new IllegalArgumentException( throw new IllegalArgumentException(
"missing event subscriber for an annotated method. Is " + listener + " registered?"); "missing event subscriber for an annotated method. Is " + listener + " registered?");
} }
// don't try to remove the set if it's empty; that can't be done safely without a lock // don't try to remove the set if it's empty; that can't be done safely without a lock
// anyway, if the set is empty it'll just be wrapping an array of length 0 // anyway, if the set is empty it'll just be wrapping an array of length 0
} }
} }
Set<Subscriber> getSubscribersForTesting(Class<?> eventType) { public Set<Subscriber> getSubscribersForTesting(Class<?> eventType) {
return firstNonNull(subscribers.get(eventType), new LinkedHashSet<>()); return firstNonNull(subscribers.get(eventType), new LinkedHashSet<>());
} }
@ -87,7 +84,7 @@ final class SubscriberRegistry {
* Gets an iterator representing an immutable snapshot of all subscribers to the given event at * Gets an iterator representing an immutable snapshot of all subscribers to the given event at
* the time this method is called. * the time this method is called.
*/ */
Iterator<Subscriber> getSubscribers(Object event) { Iterator<Subscriber> getIterator(Object event) {
Set<Class<?>> eventTypes = flattenHierarchy(event.getClass()); Set<Class<?>> eventTypes = flattenHierarchy(event.getClass());
List<Iterator<Subscriber>> subscriberIterators = new ArrayList<>(eventTypes.size()); List<Iterator<Subscriber>> subscriberIterators = new ArrayList<>(eventTypes.size());
for (Class<?> eventType : eventTypes) { for (Class<?> eventType : eventTypes) {

View file

@ -20,7 +20,4 @@ public class SimpleClockEventConsumer implements EventConsumer {
logger.info("received demo clock event, instant = " + event.getInstant()); logger.info("received demo clock event, instant = " + event.getInstant());
} }
@Override
public void close() throws IOException {
}
} }

View file

@ -0,0 +1,31 @@
package org.xbib.event.generic;
import org.xbib.event.DefaultEvent;
public class DefaultGenericEvent extends DefaultEvent implements GenericEvent {
private Listener listener;
public DefaultGenericEvent() {
this(null);
}
public DefaultGenericEvent(Listener listener) {
this.listener = listener;
}
public DefaultGenericEvent setListener(Listener listener) {
this.listener = listener;
return this;
}
public Listener getListener() {
return listener;
}
public void received() {
if (listener != null) {
listener.listen(this);
}
}
}

View file

@ -0,0 +1,6 @@
package org.xbib.event.generic;
import org.xbib.event.Event;
public interface GenericEvent extends Event {
}

View file

@ -1,9 +1,13 @@
package org.xbib.event.generic; package org.xbib.event.generic;
import java.io.Closeable; import java.util.Set;
import org.xbib.event.bus.AsyncEventBus; import java.util.concurrent.CompletableFuture;
public class GenericEventManager implements Closeable { import org.xbib.event.bus.AsyncEventBus;
import org.xbib.event.bus.Subscriber;
import org.xbib.event.bus.SubscriberRegistry;
public class GenericEventManager {
private final AsyncEventBus eventBus; private final AsyncEventBus eventBus;
@ -15,7 +19,36 @@ public class GenericEventManager implements Closeable {
eventBus.post(event); eventBus.post(event);
} }
@Override public void post(DefaultGenericEvent event,
public void close() { CompletableFuture<Boolean> future) {
SubscriberRegistry subscriberRegistry = eventBus.getSubscribers();
Set<Subscriber> set = subscriberRegistry.getSubscribersForTesting(event.getClass());
event.setListener(new WrappedListener(event.getListener(), set.size(), future));
post(event);
}
static class WrappedListener implements Listener {
private final Listener listener;
private int size;
private final CompletableFuture<Boolean> future;
public WrappedListener(Listener listener, int size, CompletableFuture<Boolean> future) {
this.listener = listener;
this.size = size;
this.future = future;
}
@Override
public void listen(GenericEvent event) {
if (listener != null) {
listener.listen(event);
}
if (--size == 0) {
future.complete(true);
}
}
} }
} }

View file

@ -0,0 +1,7 @@
package org.xbib.event.generic;
@FunctionalInterface
public interface Listener {
void listen(GenericEvent event);
}

View file

@ -10,6 +10,10 @@ import org.reactivestreams.Subscription;
* @param <T> * @param <T>
*/ */
public class EmptySubscriber<T> implements org.reactivestreams.Subscriber<T> { public class EmptySubscriber<T> implements org.reactivestreams.Subscriber<T> {
public EmptySubscriber() {
}
@Override @Override
public void onSubscribe(Subscription subscription) { public void onSubscribe(Subscription subscription) {
} }

View file

@ -10,6 +10,9 @@ public class DefaultFileFollowEvent extends DefaultEvent implements FileFollowEv
private String content; private String content;
public DefaultFileFollowEvent() {
}
@Override @Override
public void setPath(Path path) { public void setPath(Path path) {
this.path = path; this.path = path;

View file

@ -15,6 +15,10 @@ import java.util.concurrent.TimeoutException;
* Abstract base class for {@link EventExecutorGroup} implementations. * Abstract base class for {@link EventExecutorGroup} implementations.
*/ */
public abstract class AbstractEventExecutorGroup implements EventExecutorGroup { public abstract class AbstractEventExecutorGroup implements EventExecutorGroup {
protected AbstractEventExecutorGroup() {
}
@Override @Override
public Future<?> submit(Runnable task) { public Future<?> submit(Runnable task) {
return next().submit(task); return next().submit(task);

View file

@ -1097,7 +1097,7 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
@Override @Override
public long id() { public long id() {
return t.getId(); return t.threadId();
} }
@Override @Override

View file

@ -7,7 +7,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.regex.Matcher; import java.util.regex.Matcher;
public class CEFMessageParser extends MessageParser { public final class CEFMessageParser extends MessageParser {
private static final String CEF_PREFIX_PATTERN = "^(<(?<priority>\\d+)>)?(?<date>([a-zA-Z]{3}\\s+\\d+\\s+\\d+:\\d+:\\d+)|([0-9T:.Z-]+))\\s+(?<host>\\S+)\\s+CEF:(?<version>\\d+)\\|(?<data>.*)$"; private static final String CEF_PREFIX_PATTERN = "^(<(?<priority>\\d+)>)?(?<date>([a-zA-Z]{3}\\s+\\d+\\s+\\d+:\\d+:\\d+)|([0-9T:.Z-]+))\\s+(?<host>\\S+)\\s+CEF:(?<version>\\d+)\\|(?<data>.*)$";

View file

@ -201,6 +201,9 @@ public class DefaultSyslogMessage implements SyslogMessage {
Map<String, Object> map; Map<String, Object> map;
private Builder() {
}
public Builder date(LocalDateTime date) { public Builder date(LocalDateTime date) {
this.date = date; this.date = date;
return this; return this;

View file

@ -3,7 +3,7 @@ package org.xbib.event.syslog;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.regex.Matcher; import java.util.regex.Matcher;
public class RFC3164MessageParser extends MessageParser { public final class RFC3164MessageParser extends MessageParser {
private static final String PATTERN = "^(<(?<priority>\\d+)>)?(?<date>([a-zA-Z]{3}\\s+\\d+\\s+\\d+:\\d+:\\d+)|([0-9T:.Z-]+))\\s+(?<host>\\S+)\\s+((?<tag>[^\\[\\s\\]]+)(\\[(?<procid>\\d+)\\])?:)*\\s*(?<message>.+)$"; private static final String PATTERN = "^(<(?<priority>\\d+)>)?(?<date>([a-zA-Z]{3}\\s+\\d+\\s+\\d+:\\d+:\\d+)|([0-9T:.Z-]+))\\s+(?<host>\\S+)\\s+((?<tag>[^\\[\\s\\]]+)(\\[(?<procid>\\d+)\\])?:)*\\s*(?<message>.+)$";

View file

@ -7,7 +7,7 @@ import java.time.LocalDateTime;
import java.util.List; import java.util.List;
import java.util.regex.Matcher; import java.util.regex.Matcher;
public class RFC5424MessageParser extends MessageParser { public final class RFC5424MessageParser extends MessageParser {
private static final String PATTERN = "^<(?<priority>\\d+)>(?<version>\\d{1,3})\\s*(?<date>[0-9:+-TZ]+)\\s*(?<host>\\S+)\\s*(?<appname>\\S+)\\s*(?<procid>\\S+)\\s*(?<msgid>\\S+)\\s*(?<structureddata>(-|\\[.+\\]))\\s*(?<message>.+)$"; private static final String PATTERN = "^<(?<priority>\\d+)>(?<version>\\d{1,3})\\s*(?<date>[0-9:+-TZ]+)\\s*(?<host>\\S+)\\s*(?<appname>\\S+)\\s*(?<procid>\\S+)\\s*(?<msgid>\\S+)\\s*(?<structureddata>(-|\\[.+\\]))\\s*(?<message>.+)$";

View file

@ -64,7 +64,6 @@ public final class InternalThreadLocalMap {
private BitSet cleanerFlags; private BitSet cleanerFlags;
/** @deprecated These padding fields will be removed in the future. */
public long rp1, rp2, rp3, rp4, rp5, rp6, rp7, rp8; public long rp1, rp2, rp3, rp4, rp5, rp6, rp7, rp8;
static { static {

View file

@ -0,0 +1,80 @@
package org.xbib.event;
import org.junit.jupiter.api.Test;
import org.xbib.event.bus.Subscribe;
import org.xbib.event.generic.DefaultGenericEvent;
import org.xbib.event.generic.GenericEvent;
import org.xbib.settings.Settings;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
import java.util.logging.Logger;
public class EventManagerTest {
private static final Logger logger = Logger.getLogger(EventManagerTest.class.getName());
@Test
void testGenericEvents() {
Settings settings = Settings.settingsBuilder()
.build();
TestEventConsumer consumer = new TestEventConsumer();
EventManager eventManager = EventManager.builder(settings)
.register(consumer)
.build();
eventManager.getGenericEventManager().post(new DefaultGenericEvent(e -> {
logger.log(Level.INFO, "received event " + e);
}));
}
@Test
void testGenericEventWithWaitForSubscriber() throws InterruptedException, ExecutionException {
Settings settings = Settings.settingsBuilder()
.build();
TestEventConsumer consumer = new TestEventConsumer();
EventManager eventManager = EventManager.builder(settings)
.register(consumer)
.build();
CompletableFuture<GenericEvent> future = new CompletableFuture<>();
eventManager.getGenericEventManager().post(new DefaultGenericEvent(e -> {
logger.log(Level.INFO, "received event " + e);
future.complete(e);
}));
GenericEvent e = future.get();
logger.log(Level.INFO, "the event was received with result " + e + ", continuing");
}
@Test
void testGenericEventWithWaitForAllConsumers() throws ExecutionException, InterruptedException {
Settings settings = Settings.settingsBuilder()
.build();
TestEventConsumer consumer1 = new TestEventConsumer();
TestEventConsumer consumer2 = new TestEventConsumer();
EventManager eventManager = EventManager.builder(settings)
.register(consumer1)
.register(consumer2)
.loadEventConsumers()
.build();
CompletableFuture<Boolean> future = new CompletableFuture<>();
eventManager.getGenericEventManager().post(new DefaultGenericEvent(e -> {
logger.log(Level.INFO, "received event " + e);
}), future);
Boolean b = future.get();
if (b != null && b) {
logger.log(Level.INFO, "the event was received by all consumers, continuing");
}
}
private static class TestEventConsumer implements EventConsumer {
TestEventConsumer() {
}
@Subscribe
public void onEvent(DefaultGenericEvent event) {
event.received();
}
}
}

View file

@ -18,8 +18,4 @@ public class TestClockEventConsumer implements EventConsumer {
void onEvent(TestClockEvent event) { void onEvent(TestClockEvent event) {
logger.log(Level.INFO, "received test clock event on " + Instant.now() + " event instant = " + event.getInstant()); logger.log(Level.INFO, "received test clock event on " + Instant.now() + " event instant = " + event.getInstant());
} }
@Override
public void close() throws IOException {
}
} }

View file

@ -19,8 +19,4 @@ public class TestFileFollowEventConsumer implements EventConsumer {
void onEvent(TestFileFollowEvent event) { void onEvent(TestFileFollowEvent event) {
logger.log(Level.INFO, "received filefollow event path = " + event.getPath() + " content = " + event.getContent()); logger.log(Level.INFO, "received filefollow event path = " + event.getPath() + " content = " + event.getContent());
} }
@Override
public void close() throws IOException {
}
} }

View file

@ -19,7 +19,4 @@ public class TestTimerEventConsumer implements EventConsumer {
logger.log(Level.INFO, "received test timer event on " + Instant.now() + " event instant = " + event.getInstant()); logger.log(Level.INFO, "received test timer event on " + Instant.now() + " event instant = " + event.getInstant());
} }
@Override
public void close() throws IOException {
}
} }