timer event test

This commit is contained in:
Jörg Prante 2023-10-02 15:39:54 +02:00
parent 7a8ff4bb4a
commit 5a6a8e6d96
10 changed files with 54 additions and 44 deletions

View file

@ -6,6 +6,7 @@ package org.xbib.event.loop;
* thread, the blocking operation will most likely enter a dead lock state, hence throwing this * thread, the blocking operation will most likely enter a dead lock state, hence throwing this
* exception. * exception.
*/ */
@SuppressWarnings("serial")
public class BlockingOperationException extends IllegalStateException { public class BlockingOperationException extends IllegalStateException {
public BlockingOperationException() { } public BlockingOperationException() { }

View file

@ -4,6 +4,7 @@ package org.xbib.event.loop;
* Special {@link RuntimeException} which will be thrown by {@link EventLoop} and {@link EventLoopGroup} * Special {@link RuntimeException} which will be thrown by {@link EventLoop} and {@link EventLoopGroup}
* implementations when an error occurs. * implementations when an error occurs.
*/ */
@SuppressWarnings("serial")
public class EventLoopException extends RuntimeException { public class EventLoopException extends RuntimeException {
public EventLoopException() { public EventLoopException() {

View file

@ -53,7 +53,7 @@ public final class GlobalEventExecutor extends AbstractScheduledEventExecutor im
private GlobalEventExecutor() { private GlobalEventExecutor() {
scheduledTaskQueue().add(quietPeriodTask); scheduledTaskQueue().add(quietPeriodTask);
threadFactory = ThreadExecutorMap.apply(new DefaultThreadFactory( threadFactory = ThreadExecutorMap.applyToThreadFactory(new DefaultThreadFactory(
DefaultThreadFactory.toPoolName(getClass()), false, Thread.NORM_PRIORITY, null), this); DefaultThreadFactory.toPoolName(getClass()), false, Thread.NORM_PRIORITY, null), this);
} }

View file

@ -148,7 +148,7 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
super(parent); super(parent);
this.addTaskWakesUp = addTaskWakesUp; this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks); this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ThreadExecutorMap.apply(executor, this); this.executor = ThreadExecutorMap.applyToExecutor(executor, this);
taskQueue = newTaskQueue(this.maxPendingTasks); taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = Objects.requireNonNull(rejectedHandler, "rejectedHandler"); rejectedExecutionHandler = Objects.requireNonNull(rejectedHandler, "rejectedHandler");
} }
@ -161,7 +161,7 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
super(parent); super(parent);
this.addTaskWakesUp = addTaskWakesUp; this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS; this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
this.executor = ThreadExecutorMap.apply(executor, this); this.executor = ThreadExecutorMap.applyToExecutor(executor, this);
this.taskQueue = Objects.requireNonNull(taskQueue, "taskQueue"); this.taskQueue = Objects.requireNonNull(taskQueue, "taskQueue");
this.rejectedExecutionHandler = Objects.requireNonNull(rejectedHandler, "rejectedHandler"); this.rejectedExecutionHandler = Objects.requireNonNull(rejectedHandler, "rejectedHandler");
} }
@ -181,7 +181,7 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
* implementation that does not support blocking operations at all. * implementation that does not support blocking operations at all.
*/ */
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) { protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
return new LinkedBlockingQueue<Runnable>(maxPendingTasks); return new LinkedBlockingQueue<>(maxPendingTasks);
} }
/** /**

View file

@ -356,10 +356,8 @@ public final class NioEventLoop extends SingleThreadEventLoop {
switch (strategy) { switch (strategy) {
case SelectStrategy.CONTINUE: case SelectStrategy.CONTINUE:
continue; continue;
case SelectStrategy.BUSY_WAIT: case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO // fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT: case SelectStrategy.SELECT:
long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) { if (curDeadlineNanos == -1L) {
@ -375,8 +373,6 @@ public final class NioEventLoop extends SingleThreadEventLoop {
// so use of lazySet is ok (no race condition) // so use of lazySet is ok (no race condition)
nextWakeupNanos.lazySet(AWAKE); nextWakeupNanos.lazySet(AWAKE);
} }
// fall through
default:
} }
} catch (IOException e) { } catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild // If we receive an IOException here its because the Selector is messed up. Let's rebuild
@ -428,7 +424,7 @@ public final class NioEventLoop extends SingleThreadEventLoop {
logger.log(Level.FINE, CancelledKeyException.class.getSimpleName() + " raised by a Selector "+ selector + " - JDK bug?", e); logger.log(Level.FINE, CancelledKeyException.class.getSimpleName() + " raised by a Selector "+ selector + " - JDK bug?", e);
} }
} catch (Error e) { } catch (Error e) {
throw (Error) e; throw e;
} catch (Throwable t) { } catch (Throwable t) {
handleLoopException(t); handleLoopException(t);
} finally { } finally {
@ -441,7 +437,7 @@ public final class NioEventLoop extends SingleThreadEventLoop {
} }
} }
} catch (Error e) { } catch (Error e) {
throw (Error) e; throw e;
} catch (Throwable t) { } catch (Throwable t) {
handleLoopException(t); handleLoopException(t);
} }

View file

@ -42,7 +42,7 @@ public class FastThreadLocal<V> {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v; Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
FastThreadLocal<?>[] variablesToRemoveArray = FastThreadLocal<?>[] variablesToRemoveArray =
variablesToRemove.toArray(new FastThreadLocal[0]); variablesToRemove.toArray(new FastThreadLocal<?>[0]);
for (FastThreadLocal<?> tlv: variablesToRemoveArray) { for (FastThreadLocal<?> tlv: variablesToRemoveArray) {
tlv.remove(threadLocalMap); tlv.remove(threadLocalMap);
} }

View file

@ -33,48 +33,35 @@ public final class ThreadExecutorMap {
* Decorate the given {@link Executor} and ensure {@link #currentExecutor()} will return {@code eventExecutor} * Decorate the given {@link Executor} and ensure {@link #currentExecutor()} will return {@code eventExecutor}
* when called from within the {@link Runnable} during execution. * when called from within the {@link Runnable} during execution.
*/ */
public static Executor apply(final Executor executor, final EventExecutor eventExecutor) { public static Executor applyToExecutor(final Executor executor, final EventExecutor eventExecutor) {
Objects.requireNonNull(executor, "executor"); Objects.requireNonNull(executor, "executor");
Objects.requireNonNull(eventExecutor, "eventExecutor"); Objects.requireNonNull(eventExecutor, "eventExecutor");
return new Executor() { return command -> executor.execute(applyToRunnable(command, eventExecutor));
@Override
public void execute(final Runnable command) {
executor.execute(apply(command, eventExecutor));
}
};
}
/**
* Decorate the given {@link Runnable} and ensure {@link #currentExecutor()} will return {@code eventExecutor}
* when called from within the {@link Runnable} during execution.
*/
public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
Objects.requireNonNull(command, "command");
Objects.requireNonNull(eventExecutor, "eventExecutor");
return new Runnable() {
@Override
public void run() {
setCurrentEventExecutor(eventExecutor);
try {
command.run();
} finally {
setCurrentEventExecutor(null);
}
}
};
} }
/** /**
* Decorate the given {@link ThreadFactory} and ensure {@link #currentExecutor()} will return {@code eventExecutor} * Decorate the given {@link ThreadFactory} and ensure {@link #currentExecutor()} will return {@code eventExecutor}
* when called from within the {@link Runnable} during execution. * when called from within the {@link Runnable} during execution.
*/ */
public static ThreadFactory apply(final ThreadFactory threadFactory, final EventExecutor eventExecutor) { public static ThreadFactory applyToThreadFactory(final ThreadFactory threadFactory, final EventExecutor eventExecutor) {
Objects.requireNonNull(threadFactory, "command"); Objects.requireNonNull(threadFactory, "command");
Objects.requireNonNull(eventExecutor, "eventExecutor"); Objects.requireNonNull(eventExecutor, "eventExecutor");
return new ThreadFactory() { return r -> threadFactory.newThread(applyToRunnable(r, eventExecutor));
@Override }
public Thread newThread(Runnable r) {
return threadFactory.newThread(apply(r, eventExecutor)); /**
* Decorate the given {@link Runnable} and ensure {@link #currentExecutor()} will return {@code eventExecutor}
* when called from within the {@link Runnable} during execution.
*/
public static Runnable applyToRunnable(final Runnable command, final EventExecutor eventExecutor) {
Objects.requireNonNull(command, "command");
Objects.requireNonNull(eventExecutor, "eventExecutor");
return () -> {
setCurrentEventExecutor(eventExecutor);
try {
command.run();
} finally {
setCurrentEventExecutor(null);
} }
}; };
} }

View file

@ -138,7 +138,7 @@ public class EventBusTest {
} }
@Test @Test
public void testSubscriberThrowsExceptionHandlerThrowsException() throws Exception { public void testSubscriberThrowsExceptionHandlerThrowsException() {
final EventBus eventBus = new EventBus((exception, context) -> { final EventBus eventBus = new EventBus((exception, context) -> {
throw new RuntimeException(); throw new RuntimeException();
}); });

View file

@ -0,0 +1,4 @@
package org.xbib.event.timer;
public class TimerEvent {
}

View file

@ -0,0 +1,21 @@
package org.xbib.event.timer;
import org.junit.jupiter.api.Test;
import org.xbib.settings.Settings;
import java.io.IOException;
import java.time.Instant;
import java.util.Map;
public class TimerEventManagerTest {
@Test
public void testEvents() throws IOException {
Settings settings = Settings.settingsBuilder()
.put("event.consumer.testconsumer.type", "org.xbib.event.timer.TimerEventConsumer")
.put("event.consumer.testconsumer.enabled", "true")
.build();
TimerEventManager timerEventManager = new TimerEventManager(settings);
timerEventManager.put("key", Instant.now(), Map.of("a", "b"));
}
}