refactoring housekeeper

This commit is contained in:
Jörg Prante 2023-10-20 15:06:36 +02:00
parent a9fa11e8de
commit 8a4af5274e
4 changed files with 143 additions and 112 deletions

View file

@ -1,5 +1,5 @@
group = org.xbib group = org.xbib
name = database name = database
version = 2.0.0 version = 2.0.1
org.gradle.warning.mode = ALL org.gradle.warning.mode = ALL

View file

@ -0,0 +1,67 @@
package org.xbib.jdbc.connection.pool;
import org.xbib.jdbc.connection.pool.util.BagEntry;
import org.xbib.jdbc.connection.pool.util.ClockSource;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* The house keeping task to retire and maintain minimum idle connections.
*/
final class HouseKeeper implements Runnable {
private static final Logger logger = Logger.getLogger(HouseKeeper.class.getName());
private final Pool pool;
private volatile long previous;
HouseKeeper(Pool pool) {
this.pool = pool;
this.previous = ClockSource.plusMillis(ClockSource.currentTime(), -pool.config.getHousekeepingPeriodMs());
}
@Override
public void run() {
try {
logger.log(Level.FINE, () -> "housekeeper running");
pool.connectionTimeout = pool.config.getConnectionTimeout();
pool.validationTimeout = pool.config.getValidationTimeout();
pool.leakTaskFactory.updateLeakDetectionThreshold(pool.config.getLeakDetectionThreshold());
pool.catalog = (pool.config.getCatalog() != null && !pool.config.getCatalog().equals(pool.catalog)) ? pool.config.getCatalog() : pool.catalog;
final long idleTimeout = pool.config.getIdleTimeout();
final long now = ClockSource.currentTime();
// allowing +128ms as per NTP spec
if (ClockSource.plusMillis(now, 128) < ClockSource.plusMillis(previous, pool.config.getHousekeepingPeriodMs())) {
logger.log(Level.WARNING, "retrograde clock change detected (housekeeper delta=), soft-evicting connections from pool: " +
pool.poolName + " " + ClockSource.elapsedDisplayString(previous, now));
previous = now;
pool.softEvictConnections();
return;
} else if (now > ClockSource.plusMillis(previous, (3 * pool.config.getHousekeepingPeriodMs()) / 2)) {
logger.log(Level.WARNING, "thread starvation or clock leap detected: " +
pool.poolName + " housekeeper delta=" + ClockSource.elapsedDisplayString(previous, now));
}
previous = now;
if (idleTimeout > 0L && pool.config.getMinimumIdle() < pool.config.getMaximumPoolSize()) {
pool.logPoolState("before cleanup");
final List<PoolEntry> notInUse = pool.bag.values(BagEntry.STATE_NOT_IN_USE);
int toRemove = notInUse.size() - pool.config.getMinimumIdle();
for (PoolEntry entry : notInUse) {
if (toRemove > 0 && ClockSource.elapsedMillis(entry.getLastAccessed(), now) > idleTimeout && pool.bag.reserve(entry)) {
pool.closeConnection(entry, "(connection has passed idleTimeout)");
toRemove--;
}
}
pool.logPoolState("after cleanup");
} else {
pool.logPoolState("pool");
}
pool.fillPool();
} catch (Exception e) {
logger.log(Level.SEVERE, "unexpected exception in housekeeping task: " + e.getMessage(), e);
}
}
}

View file

@ -21,7 +21,6 @@ import java.util.Locale;
import java.util.Optional; import java.util.Optional;
import java.util.Properties; import java.util.Properties;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.RejectedExecutionHandler;
@ -44,19 +43,19 @@ public class Pool implements BagStateListener {
private static final Logger logger = Logger.getLogger(Pool.class.getName()); private static final Logger logger = Logger.getLogger(Pool.class.getName());
private static final int POOL_NORMAL = 0; static final int POOL_NORMAL = 0;
private static final int POOL_SHUTDOWN = 2; static final int POOL_SHUTDOWN = 2;
private volatile int poolState; volatile int poolState;
private static final String EVICTED_CONNECTION_MESSAGE = "(connection was evicted)"; private static final String EVICTED_CONNECTION_MESSAGE = "(connection was evicted)";
private static final String DEAD_CONNECTION_MESSAGE = "(connection is dead)"; private static final String DEAD_CONNECTION_MESSAGE = "(connection is dead)";
private final PoolEntryCreator poolEntryCreator = new PoolEntryCreator(null); private final PoolEntryCreator poolEntryCreator;
private final PoolEntryCreator postFillPoolEntryCreator = new PoolEntryCreator("after adding "); private final PoolEntryCreator postFillPoolEntryCreator;
private final Collection<Runnable> addConnectionQueueReadOnlyView; private final Collection<Runnable> addConnectionQueueReadOnlyView;
@ -64,32 +63,32 @@ public class Pool implements BagStateListener {
private final ThreadPoolExecutor closeConnectionExecutor; private final ThreadPoolExecutor closeConnectionExecutor;
private final Bag<PoolEntry> bag; final Bag<PoolEntry> bag;
private final ProxyLeakTaskFactory leakTaskFactory; final ProxyLeakTaskFactory leakTaskFactory;
private final ScheduledExecutorService houseKeepingExecutorService; private final ScheduledExecutorService houseKeepingExecutorService;
private ScheduledFuture<?> houseKeeperTask; private ScheduledFuture<?> houseKeeperTask;
private final PoolConfig config; final PoolConfig config;
private final String poolName; final String poolName;
private String catalog; String catalog;
private final AtomicReference<Exception> lastConnectionFailure; private final AtomicReference<Exception> lastConnectionFailure;
private long connectionTimeout;
private long validationTimeout;
private static final int UNINITIALIZED = -1; private static final int UNINITIALIZED = -1;
private static final int TRUE = 1; private static final int TRUE = 1;
private static final int FALSE = 0; private static final int FALSE = 0;
long connectionTimeout;
long validationTimeout;
private int networkTimeout; private int networkTimeout;
private int isNetworkTimeoutSupported; private int isNetworkTimeoutSupported;
@ -139,6 +138,8 @@ public class Pool implements BagStateListener {
this.lastConnectionFailure = new AtomicReference<>(); this.lastConnectionFailure = new AtomicReference<>();
initializeDataSource(); initializeDataSource();
this.bag = new Bag<>(this); this.bag = new Bag<>(this);
this.poolEntryCreator = new PoolEntryCreator(this, null);
this.postFillPoolEntryCreator = new PoolEntryCreator(this, "after adding ");
this.houseKeepingExecutorService = initializeHouseKeepingExecutorService(); this.houseKeepingExecutorService = initializeHouseKeepingExecutorService();
long initializationTimeout = config.getInitializationFailTimeout(); long initializationTimeout = config.getInitializationFailTimeout();
if (initializationTimeout >= 0) { if (initializationTimeout >= 0) {
@ -151,7 +152,7 @@ public class Pool implements BagStateListener {
this.addConnectionExecutor = createThreadPoolExecutor(addConnectionQueue, poolName + " connection adder", threadFactory, new ThreadPoolExecutor.DiscardOldestPolicy()); this.addConnectionExecutor = createThreadPoolExecutor(addConnectionQueue, poolName + " connection adder", threadFactory, new ThreadPoolExecutor.DiscardOldestPolicy());
this.closeConnectionExecutor = createThreadPoolExecutor(maxPoolSize, poolName + " connection closer", threadFactory, new ThreadPoolExecutor.CallerRunsPolicy()); this.closeConnectionExecutor = createThreadPoolExecutor(maxPoolSize, poolName + " connection closer", threadFactory, new ThreadPoolExecutor.CallerRunsPolicy());
this.leakTaskFactory = new ProxyLeakTaskFactory(config.getLeakDetectionThreshold(), houseKeepingExecutorService); this.leakTaskFactory = new ProxyLeakTaskFactory(config.getLeakDetectionThreshold(), houseKeepingExecutorService);
this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(), 100L, config.getHousekeepingPeriodMs(), TimeUnit.MILLISECONDS); this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(this), 100L, config.getHousekeepingPeriodMs(), TimeUnit.MILLISECONDS);
if (Boolean.getBoolean("org.xbib.jdbc.connection.pool.blockUntilFilled") && config.getInitializationFailTimeout() > 1) { if (Boolean.getBoolean("org.xbib.jdbc.connection.pool.blockUntilFilled") && config.getInitializationFailTimeout() > 1) {
addConnectionExecutor.setCorePoolSize(Math.min(16, Runtime.getRuntime().availableProcessors())); addConnectionExecutor.setCorePoolSize(Math.min(16, Runtime.getRuntime().availableProcessors()));
addConnectionExecutor.setMaximumPoolSize(Math.min(16, Runtime.getRuntime().availableProcessors())); addConnectionExecutor.setMaximumPoolSize(Math.min(16, Runtime.getRuntime().availableProcessors()));
@ -677,7 +678,7 @@ public class Pool implements BagStateListener {
* Creating new poolEntry. If maxLifetime is configured, create a future End-of-life task with 2.5% variance from * Creating new poolEntry. If maxLifetime is configured, create a future End-of-life task with 2.5% variance from
* the maxLifetime time to ensure there is no massive die-off of Connections in the pool. * the maxLifetime time to ensure there is no massive die-off of Connections in the pool.
*/ */
private PoolEntry createPoolEntry() { PoolEntry createPoolEntry() {
try { try {
final PoolEntry poolEntry = newPoolEntry(); final PoolEntry poolEntry = newPoolEntry();
final long maxLifetime = config.getMaxLifetime(); final long maxLifetime = config.getMaxLifetime();
@ -711,7 +712,7 @@ public class Pool implements BagStateListener {
/** /**
* Fill pool up from current idle connections (as they are perceived at the point of execution) to minimumIdle connections. * Fill pool up from current idle connections (as they are perceived at the point of execution) to minimumIdle connections.
*/ */
private synchronized void fillPool() { synchronized void fillPool() {
int connectionsToAdd = Math.min(config.getMaximumPoolSize() - getTotalConnections(), config.getMinimumIdle() - getIdleConnections()) int connectionsToAdd = Math.min(config.getMaximumPoolSize() - getTotalConnections(), config.getMinimumIdle() - getIdleConnections())
- addConnectionQueueReadOnlyView.size(); - addConnectionQueueReadOnlyView.size();
if (connectionsToAdd <= 0) { if (connectionsToAdd <= 0) {
@ -911,7 +912,7 @@ public class Pool implements BagStateListener {
* *
* @param millis the number of milliseconds to sleep * @param millis the number of milliseconds to sleep
*/ */
private static void quietlySleep(long millis) { static void quietlySleep(long millis) {
try { try {
Thread.sleep(millis); Thread.sleep(millis);
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -969,98 +970,13 @@ public class Pool implements BagStateListener {
} }
/** /**
* Creating and adding poolEntries (connections) to the pool. * We only create connections if we need another idle connection or have threads still waiting
* for a new connection. Otherwise, we bail out of the request to create.
*
* @return true if we should create a connection, false if the need has disappeared
*/ */
private final class PoolEntryCreator implements Callable<Boolean> { synchronized boolean shouldCreateAnotherConnection() {
return getTotalConnections() < config.getMaximumPoolSize() &&
private final String loggingPrefix; (bag.getWaitingThreadCount() > 0 || getIdleConnections() < config.getMinimumIdle());
private PoolEntryCreator(String loggingPrefix) {
this.loggingPrefix = loggingPrefix;
}
@Override
public Boolean call() {
long sleepBackoff = 250L;
while (poolState == POOL_NORMAL && shouldCreateAnotherConnection()) {
final PoolEntry poolEntry = createPoolEntry();
if (poolEntry != null) {
bag.add(poolEntry);
logger.log(Level.FINE, () -> MessageFormat.format("{0}: added connection {1} ",
poolName, poolEntry.getConnection()));
if (loggingPrefix != null) {
logPoolState(loggingPrefix);
}
return Boolean.TRUE;
}
if (loggingPrefix != null) {
logger.log(Level.FINE, () -> "connection add failed, sleeping with backoff" + poolName);
}
quietlySleep(sleepBackoff);
sleepBackoff = Math.min(TimeUnit.SECONDS.toMillis(10), Math.min(connectionTimeout, (long) (sleepBackoff * 1.5)));
}
return Boolean.FALSE;
}
/**
* We only create connections if we need another idle connection or have threads still waiting
* for a new connection. Otherwise we bail out of the request to create.
*
* @return true if we should create a connection, false if the need has disappeared
*/
private synchronized boolean shouldCreateAnotherConnection() {
return getTotalConnections() < config.getMaximumPoolSize() &&
(bag.getWaitingThreadCount() > 0 || getIdleConnections() < config.getMinimumIdle());
}
}
/**
* The house keeping task to retire and maintain minimum idle connections.
*/
private final class HouseKeeper implements Runnable {
private volatile long previous = ClockSource.plusMillis(ClockSource.currentTime(), -config.getHousekeepingPeriodMs());
@Override
public void run() {
try {
logger.log(Level.FINE, () -> "housekeeper running");
connectionTimeout = config.getConnectionTimeout();
validationTimeout = config.getValidationTimeout();
leakTaskFactory.updateLeakDetectionThreshold(config.getLeakDetectionThreshold());
catalog = (config.getCatalog() != null && !config.getCatalog().equals(catalog)) ? config.getCatalog() : catalog;
final long idleTimeout = config.getIdleTimeout();
final long now = ClockSource.currentTime();
// allowing +128ms as per NTP spec
if (ClockSource.plusMillis(now, 128) < ClockSource.plusMillis(previous, config.getHousekeepingPeriodMs())) {
logger.log(Level.WARNING, "retrograde clock change detected (housekeeper delta=), soft-evicting connections from pool: " +
poolName + " " + ClockSource.elapsedDisplayString(previous, now));
previous = now;
softEvictConnections();
return;
} else if (now > ClockSource.plusMillis(previous, (3 * config.getHousekeepingPeriodMs()) / 2)) {
logger.log(Level.WARNING, "thread starvation or clock leap detected: " +
poolName + " housekeeper delta=" + ClockSource.elapsedDisplayString(previous, now));
}
previous = now;
if (idleTimeout > 0L && config.getMinimumIdle() < config.getMaximumPoolSize()) {
logPoolState("before cleanup");
final List<PoolEntry> notInUse = bag.values(BagEntry.STATE_NOT_IN_USE);
int toRemove = notInUse.size() - config.getMinimumIdle();
for (PoolEntry entry : notInUse) {
if (toRemove > 0 && ClockSource.elapsedMillis(entry.getLastAccessed(), now) > idleTimeout && bag.reserve(entry)) {
closeConnection(entry, "(connection has passed idleTimeout)");
toRemove--;
}
}
logPoolState("after cleanup");
} else {
logPoolState("pool");
}
fillPool();
} catch (Exception e) {
logger.log(Level.SEVERE, "unexpected exception in housekeeping task: " + e.getMessage(), e);
}
}
} }
} }

View file

@ -0,0 +1,48 @@
package org.xbib.jdbc.connection.pool;
import java.text.MessageFormat;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import static org.xbib.jdbc.connection.pool.Pool.POOL_NORMAL;
/**
* Creating and adding poolEntries (connections) to the pool.
*/
public class PoolEntryCreator implements Callable<Boolean> {
private static final Logger logger = Logger.getLogger(PoolEntryCreator.class.getName());
private final Pool pool;
private final String loggingPrefix;
PoolEntryCreator(Pool pool, String loggingPrefix) {
this.pool = pool;
this.loggingPrefix = loggingPrefix;
}
@Override
public Boolean call() {
long sleepBackoff = 250L;
while (pool.poolState == POOL_NORMAL && pool.shouldCreateAnotherConnection()) {
final PoolEntry poolEntry = pool.createPoolEntry();
if (poolEntry != null) {
pool.bag.add(poolEntry);
logger.log(Level.FINE, () -> MessageFormat.format("{0}: added connection {1} ", pool.poolName, poolEntry.getConnection()));
if (loggingPrefix != null) {
pool.logPoolState(loggingPrefix);
}
return Boolean.TRUE;
}
if (loggingPrefix != null) {
logger.log(Level.FINE, () -> "connection add failed, sleeping with backoff" + pool.poolName);
}
Pool.quietlySleep(sleepBackoff);
sleepBackoff = Math.min(TimeUnit.SECONDS.toMillis(10), Math.min(pool.connectionTimeout, (long) (sleepBackoff * 1.5)));
}
return Boolean.FALSE;
}
}