more work on nio event loop

This commit is contained in:
Jörg Prante 2023-09-22 16:09:01 +02:00
parent 19e0a1aaec
commit 73188102fe
5 changed files with 152 additions and 95 deletions

View file

@ -37,32 +37,34 @@ public final class ScheduledFutureTask<V> extends PromiseTask<V> implements Sche
private int queueIndex = INDEX_NOT_IN_QUEUE; private int queueIndex = INDEX_NOT_IN_QUEUE;
public ScheduledFutureTask(AbstractScheduledEventExecutor executor, public ScheduledFutureTask(AbstractScheduledEventExecutor executor,
Runnable runnable, long nanoTime) { Runnable runnable,
long nanoTime) {
super(executor, runnable); super(executor, runnable);
deadlineNanos = nanoTime; deadlineNanos = nanoTime;
periodNanos = 0; periodNanos = 0;
} }
public ScheduledFutureTask(AbstractScheduledEventExecutor executor, public ScheduledFutureTask(AbstractScheduledEventExecutor executor,
Runnable runnable, long nanoTime, long period) { Runnable runnable,
long nanoTime,
long period) {
super(executor, runnable); super(executor, runnable);
deadlineNanos = nanoTime; deadlineNanos = nanoTime;
periodNanos = validatePeriod(period); periodNanos = validatePeriod(period);
} }
public ScheduledFutureTask(AbstractScheduledEventExecutor executor, public ScheduledFutureTask(AbstractScheduledEventExecutor executor,
Callable<V> callable, long nanoTime, long period) { Callable<V> callable,
long nanoTime,
long period) {
super(executor, callable); super(executor, callable);
deadlineNanos = nanoTime; deadlineNanos = nanoTime;
periodNanos = validatePeriod(period); periodNanos = validatePeriod(period);
} }
public ScheduledFutureTask(AbstractScheduledEventExecutor executor, public ScheduledFutureTask(AbstractScheduledEventExecutor executor,
Callable<V> callable, long nanoTime) { Callable<V> callable,
long nanoTime) {
super(executor, callable); super(executor, callable);
deadlineNanos = nanoTime; deadlineNanos = nanoTime;
periodNanos = 0; periodNanos = 0;
@ -203,7 +205,6 @@ public final class ScheduledFutureTask<V> extends PromiseTask<V> implements Sche
protected StringBuilder toStringBuilder() { protected StringBuilder toStringBuilder() {
StringBuilder buf = super.toStringBuilder(); StringBuilder buf = super.toStringBuilder();
buf.setCharAt(buf.length() - 1, ','); buf.setCharAt(buf.length() - 1, ',');
return buf.append(" deadline: ") return buf.append(" deadline: ")
.append(deadlineNanos) .append(deadlineNanos)
.append(", period: ") .append(", period: ")

View file

@ -1,4 +1,3 @@
package org.xbib.event.loop; package org.xbib.event.loop;
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup { public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {

View file

@ -38,8 +38,8 @@ import java.util.logging.Logger;
*/ */
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor { public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS = Math.max(16, static final int DEFAULT_MAX_PENDING_EXECUTOR_TASKS =
Integer.getInteger("org.xbib.eventexecutor.maxPendingTasks", Integer.MAX_VALUE)); Math.max(16, Integer.getInteger("org.xbib.eventexecutor.maxPendingTasks", Integer.MAX_VALUE));
private static final Logger logger = Logger.getLogger(SingleThreadEventExecutor.class.getName()); private static final Logger logger = Logger.getLogger(SingleThreadEventExecutor.class.getName());
@ -49,11 +49,8 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
private static final int ST_SHUTDOWN = 4; private static final int ST_SHUTDOWN = 4;
private static final int ST_TERMINATED = 5; private static final int ST_TERMINATED = 5;
private static final Runnable NOOP_TASK = new Runnable() { private static final Runnable NOOP_TASK = () -> {
@Override
public void run() {
// Do nothing. // Do nothing.
}
}; };
private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER = private static final AtomicIntegerFieldUpdater<SingleThreadEventExecutor> STATE_UPDATER =
@ -71,7 +68,7 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
private volatile boolean interrupted; private volatile boolean interrupted;
private final CountDownLatch threadLock = new CountDownLatch(1); private final CountDownLatch threadLock = new CountDownLatch(1);
private final Set<Runnable> shutdownHooks = new LinkedHashSet<Runnable>(); private final Set<Runnable> shutdownHooks = new LinkedHashSet<>();
private final boolean addTaskWakesUp; private final boolean addTaskWakesUp;
private final int maxPendingTasks; private final int maxPendingTasks;
private final RejectedExecutionHandler rejectedExecutionHandler; private final RejectedExecutionHandler rejectedExecutionHandler;
@ -95,8 +92,9 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
* @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the * @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
* executor thread * executor thread
*/ */
protected SingleThreadEventExecutor( protected SingleThreadEventExecutor(EventExecutorGroup parent,
EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) { ThreadFactory threadFactory,
boolean addTaskWakesUp) {
this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp); this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp);
} }
@ -110,9 +108,11 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
* @param maxPendingTasks the maximum number of pending tasks before new tasks will be rejected. * @param maxPendingTasks the maximum number of pending tasks before new tasks will be rejected.
* @param rejectedHandler the {@link RejectedExecutionHandler} to use. * @param rejectedHandler the {@link RejectedExecutionHandler} to use.
*/ */
protected SingleThreadEventExecutor( protected SingleThreadEventExecutor(EventExecutorGroup parent,
EventExecutorGroup parent, ThreadFactory threadFactory, ThreadFactory threadFactory,
boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) { boolean addTaskWakesUp,
int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler); this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler);
} }
@ -124,7 +124,9 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
* @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the * @param addTaskWakesUp {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the
* executor thread * executor thread
*/ */
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp) { protected SingleThreadEventExecutor(EventExecutorGroup parent,
Executor executor,
boolean addTaskWakesUp) {
this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_EXECUTOR_TASKS, RejectedExecutionHandlers.reject()); this(parent, executor, addTaskWakesUp, DEFAULT_MAX_PENDING_EXECUTOR_TASKS, RejectedExecutionHandlers.reject());
} }
@ -138,8 +140,10 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
* @param maxPendingTasks the maximum number of pending tasks before new tasks will be rejected. * @param maxPendingTasks the maximum number of pending tasks before new tasks will be rejected.
* @param rejectedHandler the {@link RejectedExecutionHandler} to use. * @param rejectedHandler the {@link RejectedExecutionHandler} to use.
*/ */
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, protected SingleThreadEventExecutor(EventExecutorGroup parent,
boolean addTaskWakesUp, int maxPendingTasks, Executor executor,
boolean addTaskWakesUp,
int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) { RejectedExecutionHandler rejectedHandler) {
super(parent); super(parent);
this.addTaskWakesUp = addTaskWakesUp; this.addTaskWakesUp = addTaskWakesUp;
@ -149,8 +153,10 @@ public abstract class SingleThreadEventExecutor extends AbstractScheduledEventEx
rejectedExecutionHandler = Objects.requireNonNull(rejectedHandler, "rejectedHandler"); rejectedExecutionHandler = Objects.requireNonNull(rejectedHandler, "rejectedHandler");
} }
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, protected SingleThreadEventExecutor(EventExecutorGroup parent,
boolean addTaskWakesUp, Queue<Runnable> taskQueue, Executor executor,
boolean addTaskWakesUp,
Queue<Runnable> taskQueue,
RejectedExecutionHandler rejectedHandler) { RejectedExecutionHandler rejectedHandler) {
super(parent); super(parent);
this.addTaskWakesUp = addTaskWakesUp; this.addTaskWakesUp = addTaskWakesUp;

View file

@ -8,12 +8,15 @@ import org.xbib.event.loop.SingleThreadEventLoop;
import org.xbib.event.util.IntSupplier; import org.xbib.event.util.IntSupplier;
import java.io.IOException; import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.channels.CancelledKeyException; import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectableChannel; import java.nio.channels.SelectableChannel;
import java.nio.channels.Selector; import java.nio.channels.Selector;
import java.nio.channels.SelectionKey; import java.nio.channels.SelectionKey;
import java.nio.channels.spi.SelectorProvider; import java.nio.channels.spi.SelectorProvider;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
@ -104,7 +107,7 @@ public final class NioEventLoop extends SingleThreadEventLoop {
rejectedExecutionHandler); rejectedExecutionHandler);
this.provider = Objects.requireNonNull(selectorProvider, "selectorProvider"); this.provider = Objects.requireNonNull(selectorProvider, "selectorProvider");
this.selectStrategy = Objects.requireNonNull(strategy, "selectStrategy"); this.selectStrategy = Objects.requireNonNull(strategy, "selectStrategy");
final SelectorTuple selectorTuple = null; // openSelector(); final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector; this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector; this.unwrappedSelector = selectorTuple.unwrappedSelector;
} }
@ -131,48 +134,36 @@ public final class NioEventLoop extends SingleThreadEventLoop {
} }
} }
/*private SelectorTuple openSelector() { private SelectorTuple openSelector() {
final Selector unwrappedSelector; final Selector unwrappedSelector;
try { try {
unwrappedSelector = provider.openSelector(); unwrappedSelector = provider.openSelector();
} catch (IOException e) { } catch (IOException e) {
throw new ChannelException("failed to open a new selector", e); throw new EventLoopException("failed to open a new selector", e);
} }
if (DISABLE_KEY_SET_OPTIMIZATION) { if (DISABLE_KEY_SET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector); return new SelectorTuple(unwrappedSelector);
} }
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() { Class<?> maybeSelectorImplClass = null;
@Override
public Object run() {
try { try {
return Class.forName( maybeSelectorImplClass = Class.forName("sun.nio.ch.SelectorImpl", false,
"sun.nio.ch.SelectorImpl", ClassLoader.getSystemClassLoader());
false, } catch (ClassNotFoundException e) {
PlatformDependent.getSystemClassLoader()); //
} catch (Throwable cause) {
return cause;
} }
}
});
if (!(maybeSelectorImplClass instanceof Class) ||
// ensure the current selector implementation is what we can instrument. // ensure the current selector implementation is what we can instrument.
!((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) { if (maybeSelectorImplClass == null ||
if (maybeSelectorImplClass instanceof Throwable) { !maybeSelectorImplClass.isAssignableFrom(unwrappedSelector.getClass())) {
Throwable t = (Throwable) maybeSelectorImplClass;
logger.log(Level.FINEST, "failed to instrument a special java.util.Set into: " + unwrappedSelector, t);
}
return new SelectorTuple(unwrappedSelector); return new SelectorTuple(unwrappedSelector);
} }
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass; final Class<?> selectorImplClass = maybeSelectorImplClass;
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() { Object maybeException = null;
@Override
public Object run() {
try { try {
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
@ -206,25 +197,20 @@ public final class NioEventLoop extends SingleThreadEventLoop {
selectedKeysField.set(unwrappedSelector, selectedKeySet); selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet); publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null; return null;
} catch (NoSuchFieldException e) { } catch (NoSuchFieldException | IllegalAccessException e) {
return e; maybeException = e;
} catch (IllegalAccessException e) {
return e;
} }
}
});
if (maybeException instanceof Exception) { if (maybeException instanceof Exception) {
selectedKeys = null; selectedKeys = null;
Exception e = (Exception) maybeException; Exception e = (Exception) maybeException;
logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e); logger.log(Level.FINEST, "failed to instrument a special java.util.Set into: " + unwrappedSelector, e);
return new SelectorTuple(unwrappedSelector); return new SelectorTuple(unwrappedSelector);
} }
selectedKeys = selectedKeySet; selectedKeys = selectedKeySet;
logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector); logger.log(Level.FINEST, "instrumented a special java.util.Set into: " + unwrappedSelector);
return new SelectorTuple(unwrappedSelector, return new SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet));
new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet)); }
}*/
/** /**
* Returns the {@link SelectorProvider} used by this {@link NioEventLoop} to obtain the {@link Selector}. * Returns the {@link SelectorProvider} used by this {@link NioEventLoop} to obtain the {@link Selector}.

View file

@ -0,0 +1,65 @@
package org.xbib.event.loop.nio;
import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
import java.util.Set;
final class SelectedSelectionKeySetSelector extends Selector {
private final SelectedSelectionKeySet selectionKeys;
private final Selector delegate;
SelectedSelectionKeySetSelector(Selector delegate, SelectedSelectionKeySet selectionKeys) {
this.delegate = delegate;
this.selectionKeys = selectionKeys;
}
@Override
public boolean isOpen() {
return delegate.isOpen();
}
@Override
public SelectorProvider provider() {
return delegate.provider();
}
@Override
public Set<SelectionKey> keys() {
return delegate.keys();
}
@Override
public Set<SelectionKey> selectedKeys() {
return delegate.selectedKeys();
}
@Override
public int selectNow() throws IOException {
selectionKeys.reset();
return delegate.selectNow();
}
@Override
public int select(long timeout) throws IOException {
selectionKeys.reset();
return delegate.select(timeout);
}
@Override
public int select() throws IOException {
selectionKeys.reset();
return delegate.select();
}
@Override
public Selector wakeup() {
return delegate.wakeup();
}
@Override
public void close() throws IOException {
delegate.close();
}
}