diff --git a/gradle.properties b/gradle.properties index a82d48c..62b20f3 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,5 +1,5 @@ group = org.xbib name = database -version = 2.0.0 +version = 2.0.1 org.gradle.warning.mode = ALL diff --git a/jdbc-connection-pool/src/main/java/org/xbib/jdbc/connection/pool/HouseKeeper.java b/jdbc-connection-pool/src/main/java/org/xbib/jdbc/connection/pool/HouseKeeper.java new file mode 100644 index 0000000..18cacaf --- /dev/null +++ b/jdbc-connection-pool/src/main/java/org/xbib/jdbc/connection/pool/HouseKeeper.java @@ -0,0 +1,67 @@ +package org.xbib.jdbc.connection.pool; + +import org.xbib.jdbc.connection.pool.util.BagEntry; +import org.xbib.jdbc.connection.pool.util.ClockSource; + +import java.util.List; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * The house keeping task to retire and maintain minimum idle connections. + */ +final class HouseKeeper implements Runnable { + + private static final Logger logger = Logger.getLogger(HouseKeeper.class.getName()); + + private final Pool pool; + + private volatile long previous; + + HouseKeeper(Pool pool) { + this.pool = pool; + this.previous = ClockSource.plusMillis(ClockSource.currentTime(), -pool.config.getHousekeepingPeriodMs()); + } + + @Override + public void run() { + try { + logger.log(Level.FINE, () -> "housekeeper running"); + pool.connectionTimeout = pool.config.getConnectionTimeout(); + pool.validationTimeout = pool.config.getValidationTimeout(); + pool.leakTaskFactory.updateLeakDetectionThreshold(pool.config.getLeakDetectionThreshold()); + pool.catalog = (pool.config.getCatalog() != null && !pool.config.getCatalog().equals(pool.catalog)) ? pool.config.getCatalog() : pool.catalog; + final long idleTimeout = pool.config.getIdleTimeout(); + final long now = ClockSource.currentTime(); + // allowing +128ms as per NTP spec + if (ClockSource.plusMillis(now, 128) < ClockSource.plusMillis(previous, pool.config.getHousekeepingPeriodMs())) { + logger.log(Level.WARNING, "retrograde clock change detected (housekeeper delta=), soft-evicting connections from pool: " + + pool.poolName + " " + ClockSource.elapsedDisplayString(previous, now)); + previous = now; + pool.softEvictConnections(); + return; + } else if (now > ClockSource.plusMillis(previous, (3 * pool.config.getHousekeepingPeriodMs()) / 2)) { + logger.log(Level.WARNING, "thread starvation or clock leap detected: " + + pool.poolName + " housekeeper delta=" + ClockSource.elapsedDisplayString(previous, now)); + } + previous = now; + if (idleTimeout > 0L && pool.config.getMinimumIdle() < pool.config.getMaximumPoolSize()) { + pool.logPoolState("before cleanup"); + final List notInUse = pool.bag.values(BagEntry.STATE_NOT_IN_USE); + int toRemove = notInUse.size() - pool.config.getMinimumIdle(); + for (PoolEntry entry : notInUse) { + if (toRemove > 0 && ClockSource.elapsedMillis(entry.getLastAccessed(), now) > idleTimeout && pool.bag.reserve(entry)) { + pool.closeConnection(entry, "(connection has passed idleTimeout)"); + toRemove--; + } + } + pool.logPoolState("after cleanup"); + } else { + pool.logPoolState("pool"); + } + pool.fillPool(); + } catch (Exception e) { + logger.log(Level.SEVERE, "unexpected exception in housekeeping task: " + e.getMessage(), e); + } + } +} diff --git a/jdbc-connection-pool/src/main/java/org/xbib/jdbc/connection/pool/Pool.java b/jdbc-connection-pool/src/main/java/org/xbib/jdbc/connection/pool/Pool.java index 150910c..5c17b00 100644 --- a/jdbc-connection-pool/src/main/java/org/xbib/jdbc/connection/pool/Pool.java +++ b/jdbc-connection-pool/src/main/java/org/xbib/jdbc/connection/pool/Pool.java @@ -21,7 +21,6 @@ import java.util.Locale; import java.util.Optional; import java.util.Properties; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionHandler; @@ -44,19 +43,19 @@ public class Pool implements BagStateListener { private static final Logger logger = Logger.getLogger(Pool.class.getName()); - private static final int POOL_NORMAL = 0; + static final int POOL_NORMAL = 0; - private static final int POOL_SHUTDOWN = 2; + static final int POOL_SHUTDOWN = 2; - private volatile int poolState; + volatile int poolState; private static final String EVICTED_CONNECTION_MESSAGE = "(connection was evicted)"; private static final String DEAD_CONNECTION_MESSAGE = "(connection is dead)"; - private final PoolEntryCreator poolEntryCreator = new PoolEntryCreator(null); + private final PoolEntryCreator poolEntryCreator; - private final PoolEntryCreator postFillPoolEntryCreator = new PoolEntryCreator("after adding "); + private final PoolEntryCreator postFillPoolEntryCreator; private final Collection addConnectionQueueReadOnlyView; @@ -64,32 +63,32 @@ public class Pool implements BagStateListener { private final ThreadPoolExecutor closeConnectionExecutor; - private final Bag bag; + final Bag bag; - private final ProxyLeakTaskFactory leakTaskFactory; + final ProxyLeakTaskFactory leakTaskFactory; private final ScheduledExecutorService houseKeepingExecutorService; private ScheduledFuture houseKeeperTask; - private final PoolConfig config; + final PoolConfig config; - private final String poolName; + final String poolName; - private String catalog; + String catalog; private final AtomicReference lastConnectionFailure; - private long connectionTimeout; - - private long validationTimeout; - private static final int UNINITIALIZED = -1; private static final int TRUE = 1; private static final int FALSE = 0; + long connectionTimeout; + + long validationTimeout; + private int networkTimeout; private int isNetworkTimeoutSupported; @@ -139,6 +138,8 @@ public class Pool implements BagStateListener { this.lastConnectionFailure = new AtomicReference<>(); initializeDataSource(); this.bag = new Bag<>(this); + this.poolEntryCreator = new PoolEntryCreator(this, null); + this.postFillPoolEntryCreator = new PoolEntryCreator(this, "after adding "); this.houseKeepingExecutorService = initializeHouseKeepingExecutorService(); long initializationTimeout = config.getInitializationFailTimeout(); if (initializationTimeout >= 0) { @@ -151,7 +152,7 @@ public class Pool implements BagStateListener { this.addConnectionExecutor = createThreadPoolExecutor(addConnectionQueue, poolName + " connection adder", threadFactory, new ThreadPoolExecutor.DiscardOldestPolicy()); this.closeConnectionExecutor = createThreadPoolExecutor(maxPoolSize, poolName + " connection closer", threadFactory, new ThreadPoolExecutor.CallerRunsPolicy()); this.leakTaskFactory = new ProxyLeakTaskFactory(config.getLeakDetectionThreshold(), houseKeepingExecutorService); - this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(), 100L, config.getHousekeepingPeriodMs(), TimeUnit.MILLISECONDS); + this.houseKeeperTask = houseKeepingExecutorService.scheduleWithFixedDelay(new HouseKeeper(this), 100L, config.getHousekeepingPeriodMs(), TimeUnit.MILLISECONDS); if (Boolean.getBoolean("org.xbib.jdbc.connection.pool.blockUntilFilled") && config.getInitializationFailTimeout() > 1) { addConnectionExecutor.setCorePoolSize(Math.min(16, Runtime.getRuntime().availableProcessors())); addConnectionExecutor.setMaximumPoolSize(Math.min(16, Runtime.getRuntime().availableProcessors())); @@ -677,7 +678,7 @@ public class Pool implements BagStateListener { * Creating new poolEntry. If maxLifetime is configured, create a future End-of-life task with 2.5% variance from * the maxLifetime time to ensure there is no massive die-off of Connections in the pool. */ - private PoolEntry createPoolEntry() { + PoolEntry createPoolEntry() { try { final PoolEntry poolEntry = newPoolEntry(); final long maxLifetime = config.getMaxLifetime(); @@ -711,7 +712,7 @@ public class Pool implements BagStateListener { /** * Fill pool up from current idle connections (as they are perceived at the point of execution) to minimumIdle connections. */ - private synchronized void fillPool() { + synchronized void fillPool() { int connectionsToAdd = Math.min(config.getMaximumPoolSize() - getTotalConnections(), config.getMinimumIdle() - getIdleConnections()) - addConnectionQueueReadOnlyView.size(); if (connectionsToAdd <= 0) { @@ -911,7 +912,7 @@ public class Pool implements BagStateListener { * * @param millis the number of milliseconds to sleep */ - private static void quietlySleep(long millis) { + static void quietlySleep(long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { @@ -969,98 +970,13 @@ public class Pool implements BagStateListener { } /** - * Creating and adding poolEntries (connections) to the pool. + * We only create connections if we need another idle connection or have threads still waiting + * for a new connection. Otherwise, we bail out of the request to create. + * + * @return true if we should create a connection, false if the need has disappeared */ - private final class PoolEntryCreator implements Callable { - - private final String loggingPrefix; - - private PoolEntryCreator(String loggingPrefix) { - this.loggingPrefix = loggingPrefix; - } - - @Override - public Boolean call() { - long sleepBackoff = 250L; - while (poolState == POOL_NORMAL && shouldCreateAnotherConnection()) { - final PoolEntry poolEntry = createPoolEntry(); - if (poolEntry != null) { - bag.add(poolEntry); - logger.log(Level.FINE, () -> MessageFormat.format("{0}: added connection {1} ", - poolName, poolEntry.getConnection())); - if (loggingPrefix != null) { - logPoolState(loggingPrefix); - } - return Boolean.TRUE; - } - if (loggingPrefix != null) { - logger.log(Level.FINE, () -> "connection add failed, sleeping with backoff" + poolName); - } - quietlySleep(sleepBackoff); - sleepBackoff = Math.min(TimeUnit.SECONDS.toMillis(10), Math.min(connectionTimeout, (long) (sleepBackoff * 1.5))); - } - return Boolean.FALSE; - } - - /** - * We only create connections if we need another idle connection or have threads still waiting - * for a new connection. Otherwise we bail out of the request to create. - * - * @return true if we should create a connection, false if the need has disappeared - */ - private synchronized boolean shouldCreateAnotherConnection() { - return getTotalConnections() < config.getMaximumPoolSize() && - (bag.getWaitingThreadCount() > 0 || getIdleConnections() < config.getMinimumIdle()); - } - } - - /** - * The house keeping task to retire and maintain minimum idle connections. - */ - private final class HouseKeeper implements Runnable { - - private volatile long previous = ClockSource.plusMillis(ClockSource.currentTime(), -config.getHousekeepingPeriodMs()); - - @Override - public void run() { - try { - logger.log(Level.FINE, () -> "housekeeper running"); - connectionTimeout = config.getConnectionTimeout(); - validationTimeout = config.getValidationTimeout(); - leakTaskFactory.updateLeakDetectionThreshold(config.getLeakDetectionThreshold()); - catalog = (config.getCatalog() != null && !config.getCatalog().equals(catalog)) ? config.getCatalog() : catalog; - final long idleTimeout = config.getIdleTimeout(); - final long now = ClockSource.currentTime(); - // allowing +128ms as per NTP spec - if (ClockSource.plusMillis(now, 128) < ClockSource.plusMillis(previous, config.getHousekeepingPeriodMs())) { - logger.log(Level.WARNING, "retrograde clock change detected (housekeeper delta=), soft-evicting connections from pool: " + - poolName + " " + ClockSource.elapsedDisplayString(previous, now)); - previous = now; - softEvictConnections(); - return; - } else if (now > ClockSource.plusMillis(previous, (3 * config.getHousekeepingPeriodMs()) / 2)) { - logger.log(Level.WARNING, "thread starvation or clock leap detected: " + - poolName + " housekeeper delta=" + ClockSource.elapsedDisplayString(previous, now)); - } - previous = now; - if (idleTimeout > 0L && config.getMinimumIdle() < config.getMaximumPoolSize()) { - logPoolState("before cleanup"); - final List notInUse = bag.values(BagEntry.STATE_NOT_IN_USE); - int toRemove = notInUse.size() - config.getMinimumIdle(); - for (PoolEntry entry : notInUse) { - if (toRemove > 0 && ClockSource.elapsedMillis(entry.getLastAccessed(), now) > idleTimeout && bag.reserve(entry)) { - closeConnection(entry, "(connection has passed idleTimeout)"); - toRemove--; - } - } - logPoolState("after cleanup"); - } else { - logPoolState("pool"); - } - fillPool(); - } catch (Exception e) { - logger.log(Level.SEVERE, "unexpected exception in housekeeping task: " + e.getMessage(), e); - } - } + synchronized boolean shouldCreateAnotherConnection() { + return getTotalConnections() < config.getMaximumPoolSize() && + (bag.getWaitingThreadCount() > 0 || getIdleConnections() < config.getMinimumIdle()); } } diff --git a/jdbc-connection-pool/src/main/java/org/xbib/jdbc/connection/pool/PoolEntryCreator.java b/jdbc-connection-pool/src/main/java/org/xbib/jdbc/connection/pool/PoolEntryCreator.java new file mode 100644 index 0000000..758758f --- /dev/null +++ b/jdbc-connection-pool/src/main/java/org/xbib/jdbc/connection/pool/PoolEntryCreator.java @@ -0,0 +1,48 @@ +package org.xbib.jdbc.connection.pool; + +import java.text.MessageFormat; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; +import java.util.logging.Level; +import java.util.logging.Logger; + +import static org.xbib.jdbc.connection.pool.Pool.POOL_NORMAL; + +/** + * Creating and adding poolEntries (connections) to the pool. + */ +public class PoolEntryCreator implements Callable { + + private static final Logger logger = Logger.getLogger(PoolEntryCreator.class.getName()); + + private final Pool pool; + + private final String loggingPrefix; + + PoolEntryCreator(Pool pool, String loggingPrefix) { + this.pool = pool; + this.loggingPrefix = loggingPrefix; + } + + @Override + public Boolean call() { + long sleepBackoff = 250L; + while (pool.poolState == POOL_NORMAL && pool.shouldCreateAnotherConnection()) { + final PoolEntry poolEntry = pool.createPoolEntry(); + if (poolEntry != null) { + pool.bag.add(poolEntry); + logger.log(Level.FINE, () -> MessageFormat.format("{0}: added connection {1} ", pool.poolName, poolEntry.getConnection())); + if (loggingPrefix != null) { + pool.logPoolState(loggingPrefix); + } + return Boolean.TRUE; + } + if (loggingPrefix != null) { + logger.log(Level.FINE, () -> "connection add failed, sleeping with backoff" + pool.poolName); + } + Pool.quietlySleep(sleepBackoff); + sleepBackoff = Math.min(TimeUnit.SECONDS.toMillis(10), Math.min(pool.connectionTimeout, (long) (sleepBackoff * 1.5))); + } + return Boolean.FALSE; + } +}