relax pool dependency

This commit is contained in:
Jörg Prante 2024-12-29 16:47:00 +01:00
parent 844b288a18
commit ac788cb95b
61 changed files with 12647 additions and 69 deletions

View file

@ -1,3 +1,3 @@
group = org.xbib
name = database
version = 2.2.0
version = 2.3.0

View file

@ -3,7 +3,6 @@ import org.xbib.jdbc.query.Flavor;
module org.xbib.jdbc.oracle {
requires org.xbib.jdbc.query;
requires org.xbib.jdbc.connection.pool;
requires com.oracle.database.jdbc;
requires java.sql;
uses Flavor;

View file

@ -1,7 +1,6 @@
package org.xbib.jdbc.oracle;
import oracle.jdbc.OraclePreparedStatement;
import org.xbib.jdbc.connection.pool.ProxyPreparedStatement;
import org.xbib.jdbc.query.Flavor;
import java.sql.PreparedStatement;
@ -45,7 +44,13 @@ public class Oracle implements Flavor {
return "binary_float";
}
@Override
public void setFloat(PreparedStatement preparedStatement, int i, Float floatValue) throws SQLException {
if (preparedStatement instanceof OraclePreparedStatement) {
((OraclePreparedStatement) preparedStatement).setBinaryFloat(i, floatValue);
}
}
/*@Override
public void setFloat(PreparedStatement preparedStatement, int i, Float floatValue) throws SQLException {
if (preparedStatement instanceof ProxyPreparedStatement) {
ProxyPreparedStatement proxyPreparedStatement = (ProxyPreparedStatement) preparedStatement;
@ -53,14 +58,20 @@ public class Oracle implements Flavor {
} else {
((OraclePreparedStatement) preparedStatement).setBinaryFloat(i, floatValue);
}
}
}*/
@Override
public String typeDouble() {
return "binary_double";
}
@Override
public void setDouble(PreparedStatement preparedStatement, int i, Double doubleValue) throws SQLException {
if (preparedStatement instanceof OraclePreparedStatement) {
((OraclePreparedStatement) preparedStatement).setBinaryDouble(i, doubleValue);
}
}
/*@Override
public void setDouble(PreparedStatement preparedStatement, int i, Double doubleValue) throws SQLException {
if (preparedStatement instanceof ProxyPreparedStatement) {
ProxyPreparedStatement proxyPreparedStatement = (ProxyPreparedStatement) preparedStatement;
@ -68,7 +79,7 @@ public class Oracle implements Flavor {
} else {
((OraclePreparedStatement) preparedStatement).setBinaryDouble(i, doubleValue);
}
}
}*/
@Override
public String typeBigDecimal(int size, int precision) {

View file

@ -0,0 +1,13 @@
module org.xbib.jdbc.pool {
uses org.xbib.jdbc.pool.api.XbibDataSourceProvider;
requires java.security.jgss;
requires java.sql;
requires java.transaction.xa;
exports org.xbib.jdbc.pool.api;
exports org.xbib.jdbc.pool.api.cache;
exports org.xbib.jdbc.pool.api.configuration;
exports org.xbib.jdbc.pool.api.exceptionsorter;
exports org.xbib.jdbc.pool.api.security;
exports org.xbib.jdbc.pool.api.transaction;
exports org.xbib.jdbc.pool;
}

View file

@ -0,0 +1,312 @@
package org.xbib.jdbc.pool;
import java.lang.reflect.InvocationTargetException;
import java.security.Principal;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import javax.sql.XAConnection;
import org.xbib.jdbc.pool.api.XbibDataSourceListener;
import org.xbib.jdbc.pool.api.configuration.XbibConnectionFactoryConfiguration;
import org.xbib.jdbc.pool.api.security.XbibSecurityProvider;
import org.xbib.jdbc.pool.api.transaction.TransactionIntegration.ResourceRecoveryFactory;
import org.xbib.jdbc.pool.util.PropertyInjector;
import org.xbib.jdbc.pool.util.XAConnectionAdaptor;
import static org.xbib.jdbc.pool.api.configuration.XbibConnectionFactoryConfiguration.TransactionIsolation.UNDEFINED;
import static org.xbib.jdbc.pool.util.ListenerHelper.fireOnWarning;
public final class ConnectionFactory implements ResourceRecoveryFactory {
private static final String URL_PROPERTY_NAME = "url";
private static final Properties EMPTY_PROPERTIES = new Properties();
private final XbibConnectionFactoryConfiguration configuration;
private final XbibDataSourceListener[] listeners;
private final Properties jdbcProperties = new Properties(); // backup of jdbcProperties for DRIVER mode
private final Mode factoryMode;
// these are the sources for connections, that will be used depending on the mode
private java.sql.Driver driver;
private javax.sql.DataSource dataSource;
private javax.sql.XADataSource xaDataSource;
private javax.sql.XADataSource xaRecoveryDataSource;
private PropertyInjector injector;
private Integer defaultIsolationLevel;
public ConnectionFactory(XbibConnectionFactoryConfiguration configuration, XbibDataSourceListener... listeners) {
this.configuration = configuration;
this.listeners = listeners;
factoryMode = Mode.fromClass(configuration.connectionProviderClass());
switch (factoryMode) {
case XA_DATASOURCE:
injector = new PropertyInjector(configuration.connectionProviderClass());
Properties xaProperties = configuration.xaProperties().isEmpty() ? configuration.jdbcProperties() : configuration.xaProperties();
xaDataSource = newXADataSource(xaProperties);
xaRecoveryDataSource = newXADataSource(xaProperties);
break;
case DATASOURCE:
injector = new PropertyInjector(configuration.connectionProviderClass());
dataSource = newDataSource(configuration.jdbcProperties());
break;
case DRIVER:
driver = newDriver();
jdbcProperties.putAll(configuration.jdbcProperties());
break;
}
}
public int defaultJdbcIsolationLevel() {
return defaultIsolationLevel == null ? UNDEFINED.level() : defaultIsolationLevel;
}
@SuppressWarnings("StringConcatenation")
private javax.sql.XADataSource newXADataSource(Properties properties) {
javax.sql.XADataSource newDataSource;
try {
newDataSource = configuration.connectionProviderClass().asSubclass(javax.sql.XADataSource.class).getDeclaredConstructor().newInstance();
} catch (IllegalAccessException | InstantiationException | InvocationTargetException |
NoSuchMethodException e) {
throw new RuntimeException("Unable to instantiate javax.sql.XADataSource", e);
}
if (configuration.jdbcUrl() != null && !configuration.jdbcUrl().isEmpty()) {
injectUrlProperty(newDataSource, URL_PROPERTY_NAME, configuration.jdbcUrl());
}
try {
newDataSource.setLoginTimeout((int) configuration.loginTimeout().getSeconds());
} catch (SQLException e) {
fireOnWarning(listeners, "Unable to set login timeout: " + e.getMessage());
}
injectJdbcProperties(newDataSource, properties);
return newDataSource;
}
@SuppressWarnings("StringConcatenation")
private javax.sql.DataSource newDataSource(Properties properties) {
javax.sql.DataSource newDataSource;
try {
newDataSource = configuration.connectionProviderClass().asSubclass(javax.sql.DataSource.class).getDeclaredConstructor().newInstance();
} catch (IllegalAccessException | InstantiationException | InvocationTargetException |
NoSuchMethodException e) {
throw new RuntimeException("Unable to instantiate javax.sql.DataSource", e);
}
if (configuration.jdbcUrl() != null && !configuration.jdbcUrl().isEmpty()) {
injectUrlProperty(newDataSource, URL_PROPERTY_NAME, configuration.jdbcUrl());
}
try {
newDataSource.setLoginTimeout((int) configuration.loginTimeout().getSeconds());
} catch (SQLException e) {
fireOnWarning(listeners, "Unable to set login timeout: " + e.getMessage());
}
injectJdbcProperties(newDataSource, properties);
return newDataSource;
}
@SuppressWarnings("StringConcatenation")
private java.sql.Driver newDriver() {
DriverManager.setLoginTimeout((int) configuration.loginTimeout().getSeconds());
if (configuration.connectionProviderClass() == null) {
try {
return driver = DriverManager.getDriver(configuration.jdbcUrl());
} catch (SQLException sql) {
throw new RuntimeException("Unable to get java.sql.Driver from DriverManager", sql);
}
} else {
try {
driver = configuration.connectionProviderClass().asSubclass(java.sql.Driver.class).getDeclaredConstructor().newInstance();
if (!driver.acceptsURL(configuration.jdbcUrl())) {
fireOnWarning(listeners, "Driver does not support the provided URL: " + configuration.jdbcUrl());
}
return driver;
} catch (IllegalAccessException | InstantiationException | InvocationTargetException |
NoSuchMethodException e) {
throw new RuntimeException("Unable to instantiate java.sql.Driver", e);
} catch (SQLException e) {
throw new RuntimeException("Unable to verify that the java.sql.Driver supports the provided URL", e);
}
}
}
@SuppressWarnings("StringConcatenation")
private void injectUrlProperty(Object target, String propertyName, String propertyValue) {
try {
injector.inject(target, propertyName, propertyValue);
} catch (IllegalArgumentException | IllegalAccessException | InvocationTargetException |
NoSuchMethodException e) {
// AG-134 - Some drivers have setURL() instead of setUrl(), so we retry with upper case.
// AG-228 - Not all (XA)DataSource's require an url to be set
if (propertyName.chars().anyMatch(Character::isLowerCase)) {
injectUrlProperty(target, propertyName.toUpperCase(Locale.ROOT), propertyValue);
}
}
}
@SuppressWarnings({"ObjectAllocationInLoop", "StringConcatenation"})
private void injectJdbcProperties(Object target, Properties properties) {
boolean ignoring = false;
for (String propertyName : properties.stringPropertyNames()) {
try {
injector.inject(target, propertyName, properties.getProperty(propertyName));
} catch (IllegalArgumentException | IllegalAccessException | InvocationTargetException |
NoSuchMethodException e) {
fireOnWarning(listeners, "Ignoring property '" + propertyName + "': " + e.getMessage());
ignoring = true;
}
}
if (ignoring) {
fireOnWarning(listeners, "Available properties " + Arrays.toString(injector.availableProperties().toArray()));
}
}
// --- //
private Properties jdbcProperties() {
Properties properties = new Properties();
properties.putAll(jdbcProperties);
properties.putAll(securityProperties(configuration.principal(), configuration.credentials()));
return properties;
}
private Properties recoveryProperties() {
Properties properties = new Properties();
if (hasRecoveryCredentials()) {
properties.putAll(securityProperties(configuration.recoveryPrincipal(), configuration.recoveryCredentials()));
} else {
// use the main credentials when recovery credentials are not provided
properties.putAll(securityProperties(configuration.principal(), configuration.credentials()));
}
return properties;
}
private Properties securityProperties(Principal principal, Iterable<Object> credentials) {
Properties properties = new Properties();
properties.putAll(securityProperties(principal));
for (Object credential : credentials) {
properties.putAll(securityProperties(credential));
}
return properties;
}
private Properties securityProperties(Object securityObject) {
if (securityObject == null) {
return EMPTY_PROPERTIES;
}
for (XbibSecurityProvider provider : configuration.securityProviders()) {
Properties properties = provider.getSecurityProperties(securityObject);
if (properties != null) {
return properties;
}
}
fireOnWarning(listeners, "Unknown security object of type: " + securityObject.getClass().getName());
return EMPTY_PROPERTIES;
}
// --- //
public XAConnection createConnection() throws SQLException {
switch (factoryMode) {
case DRIVER:
return new XAConnectionAdaptor(connectionSetup(driver.connect(configuration.jdbcUrl(), jdbcProperties())));
case DATASOURCE:
injectJdbcProperties(dataSource, securityProperties(configuration.principal(), configuration.credentials()));
return new XAConnectionAdaptor(connectionSetup(dataSource.getConnection()));
case XA_DATASOURCE:
injectJdbcProperties(xaDataSource, securityProperties(configuration.principal(), configuration.credentials()));
return xaConnectionSetup(xaDataSource.getXAConnection());
default:
throw new SQLException("Unknown connection factory mode");
}
}
@SuppressWarnings("MagicConstant")
private Connection connectionSetup(Connection connection) throws SQLException {
if (connection == null) {
// AG-90: Driver can return null if the URL is not supported (see java.sql.Driver#connect() documentation)
throw new SQLException("Driver does not support the provided URL: " + configuration.jdbcUrl());
}
connection.setAutoCommit(configuration.autoCommit());
if (configuration.jdbcTransactionIsolation().isDefined()) {
connection.setTransactionIsolation(configuration.jdbcTransactionIsolation().level());
} else if (defaultIsolationLevel == null) {
defaultIsolationLevel = connection.getTransactionIsolation();
}
if (configuration.initialSql() != null && !configuration.initialSql().isEmpty()) {
try (Statement statement = connection.createStatement()) {
statement.execute(configuration.initialSql());
}
}
return connection;
}
private XAConnection xaConnectionSetup(XAConnection xaConnection) throws SQLException {
if (xaConnection.getXAResource() == null) {
// Make sure that XAConnections are not processed as non-XA connections by the pool
xaConnection.close();
throw new SQLException("null XAResource from XADataSource");
}
try (Connection connection = xaConnection.getConnection()) {
connectionSetup(connection);
}
return xaConnection;
}
// --- //
public boolean hasRecoveryCredentials() {
return configuration.recoveryPrincipal() != null || (configuration.recoveryCredentials() != null && !configuration.recoveryCredentials().isEmpty());
}
@Override
public boolean isRecoverable() {
if (factoryMode == Mode.XA_DATASOURCE) {
return true;
}
fireOnWarning(listeners, "Recovery connections are only available for XADataSource");
return false;
}
@Override
public XAConnection getRecoveryConnection() throws SQLException {
if (isRecoverable()) {
injectJdbcProperties(xaRecoveryDataSource, recoveryProperties());
return xaRecoveryDataSource.getXAConnection();
}
// Fallback for wrong implemented TransactionIntegration
throw new SQLException("Recovery connections are only available for XADataSource");
}
// --- //
private enum Mode {
DRIVER, DATASOURCE, XA_DATASOURCE;
@SuppressWarnings("WeakerAccess")
static Mode fromClass(Class<?> providerClass) {
if (providerClass == null) {
return DRIVER;
} else if (javax.sql.XADataSource.class.isAssignableFrom(providerClass)) {
return XA_DATASOURCE;
} else if (javax.sql.DataSource.class.isAssignableFrom(providerClass)) {
return DATASOURCE;
} else if (java.sql.Driver.class.isAssignableFrom(providerClass)) {
return DRIVER;
} else {
throw new IllegalArgumentException("Unable to create ConnectionFactory from providerClass " + providerClass.getName());
}
}
}
}

View file

@ -0,0 +1,401 @@
package org.xbib.jdbc.pool;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.time.Duration;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import javax.sql.XAConnection;
import javax.transaction.xa.XAResource;
import org.xbib.jdbc.pool.api.cache.Acquirable;
import org.xbib.jdbc.pool.api.configuration.XbibConnectionFactoryConfiguration;
import org.xbib.jdbc.pool.api.configuration.XbibConnectionPoolConfiguration;
import org.xbib.jdbc.pool.api.transaction.TransactionAware;
import org.xbib.jdbc.pool.util.AutoCloseableElement;
import org.xbib.jdbc.pool.util.UncheckedArrayList;
import org.xbib.jdbc.pool.wrapper.ConnectionWrapper;
import org.xbib.jdbc.pool.wrapper.XAConnectionWrapper;
import static java.lang.System.nanoTime;
import static java.lang.Thread.currentThread;
import static java.util.Arrays.copyOfRange;
import static java.util.EnumSet.noneOf;
import static java.util.concurrent.atomic.AtomicReferenceFieldUpdater.newUpdater;
import static org.xbib.jdbc.pool.ConnectionHandler.DirtyAttribute.AUTOCOMMIT;
import static org.xbib.jdbc.pool.ConnectionHandler.DirtyAttribute.TRANSACTION_ISOLATION;
import static org.xbib.jdbc.pool.util.ListenerHelper.fireOnWarning;
public final class ConnectionHandler implements TransactionAware, Acquirable {
private static final AtomicReferenceFieldUpdater<ConnectionHandler, State> stateUpdater = newUpdater(ConnectionHandler.class, State.class, "state");
private static final SQLCallable<Boolean> NO_ACTIVE_TRANSACTION = () -> false;
private final XAConnection xaConnection;
// Single Connection reference from xaConnection.getConnection()
private final Connection connection;
// Single XAResource reference from xaConnection.getXAResource(). Can be null for no XA datasources.
private final XAResource xaResource;
private final Pool connectionPool;
// attributes that need to be reset when the connection is returned
private final Set<DirtyAttribute> dirtyAttributes = noneOf(DirtyAttribute.class);
// collection of wrappers created while enlisted in the current transaction
private final AutoCloseableElement enlistedOpenWrappers = AutoCloseableElement.newHead();
// Can use annotation to get (in theory) a little better performance
// @Contended
private volatile State state = State.NEW;
// for leak detection (only valid for CHECKED_OUT connections)
private Thread holdingThread;
// Enhanced leak report
@SuppressWarnings("VolatileArrayField")
private volatile StackTraceElement[] acquisitionStackTrace;
private StackTraceElement[] lastOperationStackTrace;
private List<String> connectionOperations;
// for expiration (CHECKED_IN connections) and leak detection (CHECKED_OUT connections)
private long lastAccess;
// flag to indicate that this the connection is enlisted to a transaction
private boolean enlisted;
// reference to the task that flushes this connection when it gets over it's maxLifetime
private Future<?> maxLifetimeTask;
// Callback set by the transaction integration layer to prevent deferred enlistment
// If the connection is not associated with a transaction and an operation occurs within the bounds of a transaction, an SQLException is thrown
// If there is no transaction integration this should just return false
private SQLCallable<Boolean> transactionActiveCheck = NO_ACTIVE_TRANSACTION;
public ConnectionHandler(XAConnection xa, Pool pool) throws SQLException {
xaConnection = xa;
connection = xaConnection.getConnection();
xaResource = xaConnection.getXAResource();
connectionPool = pool;
touch();
}
public XAConnectionWrapper xaConnectionWrapper() {
return new XAConnectionWrapper(this, xaConnection, connectionPool.getConfiguration().connectionFactoryConfiguration().trackJdbcResources());
}
public ConnectionWrapper connectionWrapper() {
return new ConnectionWrapper(this, connectionPool.getConfiguration().connectionFactoryConfiguration().trackJdbcResources(), enlisted ? enlistedOpenWrappers : null);
}
public ConnectionWrapper detachedWrapper() {
return new ConnectionWrapper(this, connectionPool.getConfiguration().connectionFactoryConfiguration().trackJdbcResources(), true);
}
@SuppressWarnings("StringConcatenation")
public void onConnectionWrapperClose(ConnectionWrapper wrapper, ConnectionWrapper.JdbcResourcesLeakReport leakReport) throws SQLException {
if (leakReport.hasLeak()) {
fireOnWarning(connectionPool.getListeners(), "JDBC resources leaked: " + leakReport.resultSetCount() + " ResultSet(s) and " + leakReport.statementCount() + " Statement(s)");
}
if (!enlisted && !wrapper.isDetached()) {
transactionEnd();
}
}
public Connection rawConnection() {
return connection;
}
public XAResource getXaResource() {
return xaResource;
}
@SuppressWarnings("MagicConstant")
public void resetConnection() throws SQLException {
transactionActiveCheck = NO_ACTIVE_TRANSACTION;
if (!dirtyAttributes.isEmpty()) {
XbibConnectionFactoryConfiguration connectionFactoryConfiguration = connectionPool.getConfiguration().connectionFactoryConfiguration();
try {
if (dirtyAttributes.contains(AUTOCOMMIT)) {
connection.setAutoCommit(connectionFactoryConfiguration.autoCommit());
}
if (dirtyAttributes.contains(TRANSACTION_ISOLATION)) {
XbibConnectionFactoryConfiguration.IsolationLevel isolation = connectionFactoryConfiguration.jdbcTransactionIsolation();
connection.setTransactionIsolation(isolation.isDefined() ? isolation.level() : connectionPool.defaultJdbcIsolationLevel());
}
// other attributes do not have default values in connectionFactoryConfiguration
} catch (SQLException se) {
setFlushOnly(se);
throw se;
} finally {
dirtyAttributes.clear();
}
}
try {
SQLWarning warning = connection.getWarnings();
if (warning != null) {
XbibConnectionPoolConfiguration.ExceptionSorter exceptionSorter = connectionPool.getConfiguration().exceptionSorter();
while (warning != null) {
if (exceptionSorter != null && exceptionSorter.isFatal(warning)) {
setState(State.FLUSH);
}
warning = warning.getNextWarning();
}
connection.clearWarnings();
}
} catch (SQLException se) {
setFlushOnly(se);
throw se;
// keep errors
}
}
public void closeConnection() throws SQLException {
if (maxLifetimeTask != null && !maxLifetimeTask.isDone()) {
maxLifetimeTask.cancel(false);
}
maxLifetimeTask = null;
try {
State observedState = stateUpdater.get(this);
if (observedState != State.FLUSH) {
throw new SQLException("Closing connection in incorrect state " + observedState);
}
} finally {
try {
xaConnection.close();
} finally {
stateUpdater.set(this, State.DESTROYED);
}
}
}
public boolean acquire() {
return setState(State.CHECKED_IN, State.CHECKED_OUT);
}
public boolean isAcquirable() {
State observedState = stateUpdater.get(this);
return observedState != State.FLUSH && observedState != State.DESTROYED;
}
public boolean setState(State expected, State newState) {
if (expected == State.DESTROYED) {
throw new IllegalArgumentException("Trying to move out of state DESTROYED");
}
switch (newState) {
case NEW:
throw new IllegalArgumentException("Trying to set invalid state NEW");
case CHECKED_IN:
case CHECKED_OUT:
case VALIDATION:
case FLUSH:
case DESTROYED:
return stateUpdater.compareAndSet(this, expected, newState);
default:
throw new IllegalArgumentException("Trying to set invalid state " + newState);
}
}
public void setState(State newState) {
// Maybe could use lazySet here, but there doesn't seem to be any performance advantage
stateUpdater.set(this, newState);
}
private boolean isActive() {
return stateUpdater.get(this) == State.CHECKED_OUT;
}
public void touch() {
lastAccess = nanoTime();
}
public boolean isLeak(Duration timeout) {
return isActive() && !enlisted && isIdle(timeout);
}
public boolean isIdle(Duration timeout) {
return nanoTime() - lastAccess > timeout.toNanos();
}
public void setMaxLifetimeTask(Future<?> maxLifetimeTask) {
this.maxLifetimeTask = maxLifetimeTask;
}
public boolean isValid() {
return connectionPool.getConfiguration().connectionValidator().isValid(detachedWrapper());
}
// --- Leak detection //
public Thread getHoldingThread() {
return holdingThread;
}
public void setHoldingThread(Thread holdingThread) {
this.holdingThread = holdingThread;
}
// --- Enhanced leak report //
/**
* Abbreviated list of all operation on the connection, for enhanced leak report
*/
@SuppressWarnings("VariableNotUsedInsideIf")
public void traceConnectionOperation(String operation) {
if (acquisitionStackTrace != null) {
connectionOperations.add(operation);
lastOperationStackTrace = currentThread().getStackTrace();
}
}
/**
* Abbreviated list of all operation on the connection, for enhanced leak report
*/
public List<String> getConnectionOperations() {
return connectionOperations;
}
/**
* Stack trace of the first acquisition for this connection
*/
public StackTraceElement[] getAcquisitionStackTrace() {
return acquisitionStackTrace == null ? null : copyOfRange(acquisitionStackTrace, 4, acquisitionStackTrace.length);
}
/**
* Stores a stack trace for leak report. Setting a value != null also enables tracing of operations on the connection
*/
public void setAcquisitionStackTrace(StackTraceElement[] stackTrace) {
lastOperationStackTrace = null;
if (connectionOperations == null) {
connectionOperations = new UncheckedArrayList<>(String.class);
}
connectionOperations.clear();
acquisitionStackTrace = stackTrace;
}
/**
* Stack trace for the last operation on this connection
*/
public StackTraceElement[] getLastOperationStackTrace() {
return lastOperationStackTrace == null ? null : copyOfRange(lastOperationStackTrace, 3, lastOperationStackTrace.length);
}
public void setDirtyAttribute(DirtyAttribute attribute) {
dirtyAttributes.add(attribute);
}
public boolean isEnlisted() {
return enlisted;
}
// --- TransactionAware //
@Override
public Connection getConnection() {
return detachedWrapper();
}
@Override
public void transactionStart() throws SQLException {
try {
if (!enlisted && connection.getAutoCommit()) {
connection.setAutoCommit(false);
setDirtyAttribute(AUTOCOMMIT);
}
enlisted = true;
} catch (SQLException se) {
setFlushOnly(se);
throw se;
}
}
@Override
public void transactionBeforeCompletion(boolean successful) {
if (enlistedOpenWrappers.closeAllAutocloseableElements() != 0) {
if (successful) {
fireOnWarning(connectionPool.getListeners(), "Closing open connection(s) prior to commit");
} else {
// AG-168 - Close without warning as Synchronization.beforeCompletion is only invoked on success. See issue for more details.
// fireOnWarning( connectionPool.getListeners(), "Closing open connection prior to rollback" );
}
}
}
@Override
public void transactionCommit() throws SQLException {
verifyEnlistment();
try {
connection.commit();
} catch (SQLException se) {
setFlushOnly(se);
throw se;
}
}
@Override
public void transactionRollback() throws SQLException {
verifyEnlistment();
try {
connection.rollback();
} catch (SQLException se) {
setFlushOnly(se);
throw se;
}
}
@Override
public void transactionEnd() throws SQLException {
if (enlistedOpenWrappers.closeAllAutocloseableElements() != 0) {
// should never happen, but it's here as a safeguard to prevent double returns in all cases.
fireOnWarning(connectionPool.getListeners(), "Closing open connection(s) on after completion");
}
enlisted = false;
connectionPool.returnConnectionHandler(this);
}
@Override
public void transactionCheckCallback(SQLCallable<Boolean> transactionCheck) {
transactionActiveCheck = transactionCheck;
}
public void verifyEnlistment() throws SQLException {
if (!enlisted && transactionActiveCheck.call()) {
throw new SQLException("Deferred enlistment not supported");
}
if (enlisted && !transactionActiveCheck.call()) {
throw new SQLException("Enlisted connection used without active transaction");
}
}
@Override
public void setFlushOnly() {
// Assumed currentState == State.CHECKED_OUT (or eventually in FLUSH already)
setState(State.FLUSH);
}
public void setFlushOnly(SQLException se) {
// Assumed currentState == State.CHECKED_OUT (or eventually in FLUSH already)
XbibConnectionPoolConfiguration.ExceptionSorter exceptionSorter = connectionPool.getConfiguration().exceptionSorter();
if (exceptionSorter != null && exceptionSorter.isFatal(se)) {
setState(State.FLUSH);
}
}
public enum State {
NEW, CHECKED_IN, CHECKED_OUT, VALIDATION, FLUSH, DESTROYED
}
public enum DirtyAttribute {
AUTOCOMMIT, TRANSACTION_ISOLATION, NETWORK_TIMEOUT, SCHEMA, CATALOG
}
}

View file

@ -0,0 +1,823 @@
package org.xbib.jdbc.pool;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAccumulator;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
import javax.sql.XAConnection;
import org.xbib.jdbc.pool.MetricsRepository.EmptyMetricsRepository;
import org.xbib.jdbc.pool.api.XbibDataSource;
import org.xbib.jdbc.pool.api.XbibDataSourceListener;
import org.xbib.jdbc.pool.api.XbibPoolInterceptor;
import org.xbib.jdbc.pool.api.cache.ConnectionCache;
import org.xbib.jdbc.pool.api.configuration.XbibConnectionPoolConfiguration;
import org.xbib.jdbc.pool.api.transaction.TransactionIntegration;
import org.xbib.jdbc.pool.util.PriorityScheduledExecutor;
import org.xbib.jdbc.pool.util.StampedCopyOnWriteArrayList;
import org.xbib.jdbc.pool.util.XbibSynchronizer;
import static java.lang.Integer.toHexString;
import static java.lang.System.identityHashCode;
import static java.lang.System.nanoTime;
import static java.lang.Thread.currentThread;
import static java.util.Collections.unmodifiableList;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toList;
import static org.xbib.jdbc.pool.ConnectionHandler.State.CHECKED_IN;
import static org.xbib.jdbc.pool.ConnectionHandler.State.CHECKED_OUT;
import static org.xbib.jdbc.pool.ConnectionHandler.State.FLUSH;
import static org.xbib.jdbc.pool.ConnectionHandler.State.VALIDATION;
import static org.xbib.jdbc.pool.api.XbibDataSource.FlushMode.GRACEFUL;
import static org.xbib.jdbc.pool.api.XbibDataSource.FlushMode.LEAK;
import static org.xbib.jdbc.pool.api.configuration.XbibConnectionPoolConfiguration.MultipleAcquisitionAction.OFF;
import static org.xbib.jdbc.pool.util.InterceptorHelper.fireOnConnectionAcquiredInterceptor;
import static org.xbib.jdbc.pool.util.InterceptorHelper.fireOnConnectionCreateInterceptor;
import static org.xbib.jdbc.pool.util.InterceptorHelper.fireOnConnectionDestroyInterceptor;
import static org.xbib.jdbc.pool.util.InterceptorHelper.fireOnConnectionReturnInterceptor;
import static org.xbib.jdbc.pool.util.ListenerHelper.fireBeforeConnectionAcquire;
import static org.xbib.jdbc.pool.util.ListenerHelper.fireBeforeConnectionCreation;
import static org.xbib.jdbc.pool.util.ListenerHelper.fireBeforeConnectionDestroy;
import static org.xbib.jdbc.pool.util.ListenerHelper.fireBeforeConnectionFlush;
import static org.xbib.jdbc.pool.util.ListenerHelper.fireBeforeConnectionLeak;
import static org.xbib.jdbc.pool.util.ListenerHelper.fireBeforeConnectionReap;
import static org.xbib.jdbc.pool.util.ListenerHelper.fireBeforeConnectionReturn;
import static org.xbib.jdbc.pool.util.ListenerHelper.fireBeforeConnectionValidation;
import static org.xbib.jdbc.pool.util.ListenerHelper.fireOnConnectionAcquired;
import static org.xbib.jdbc.pool.util.ListenerHelper.fireOnConnectionCreation;
import static org.xbib.jdbc.pool.util.ListenerHelper.fireOnConnectionDestroy;
import static org.xbib.jdbc.pool.util.ListenerHelper.fireOnConnectionFlush;
import static org.xbib.jdbc.pool.util.ListenerHelper.fireOnConnectionInvalid;
import static org.xbib.jdbc.pool.util.ListenerHelper.fireOnConnectionLeak;
import static org.xbib.jdbc.pool.util.ListenerHelper.fireOnConnectionPooled;
import static org.xbib.jdbc.pool.util.ListenerHelper.fireOnConnectionReap;
import static org.xbib.jdbc.pool.util.ListenerHelper.fireOnConnectionReturn;
import static org.xbib.jdbc.pool.util.ListenerHelper.fireOnConnectionValid;
import static org.xbib.jdbc.pool.util.ListenerHelper.fireOnInfo;
import static org.xbib.jdbc.pool.util.ListenerHelper.fireOnWarning;
public final class ConnectionPool implements Pool {
private static final AtomicInteger HOUSEKEEP_COUNT = new AtomicInteger();
private final XbibConnectionPoolConfiguration configuration;
private final XbibDataSourceListener[] listeners;
private final StampedCopyOnWriteArrayList<ConnectionHandler> allConnections;
private final XbibSynchronizer synchronizer;
private final ConnectionFactory connectionFactory;
private final PriorityScheduledExecutor housekeepingExecutor;
private final TransactionIntegration transactionIntegration;
private final boolean borrowValidationEnabled;
private final boolean idleValidationEnabled;
private final boolean leakEnabled;
private final boolean validationEnabled;
private final boolean reapEnabled;
private final LongAccumulator maxUsed = new LongAccumulator(Math::max, Long.MIN_VALUE);
private final LongAdder activeCount = new LongAdder();
private final ConnectionCache localCache;
private MetricsRepository metricsRepository;
private List<XbibPoolInterceptor> interceptors;
public ConnectionPool(XbibConnectionPoolConfiguration configuration, XbibDataSourceListener... listeners) {
this.configuration = configuration;
this.listeners = listeners;
allConnections = new StampedCopyOnWriteArrayList<>(ConnectionHandler.class);
localCache = configuration.connectionCache();
synchronizer = new XbibSynchronizer();
connectionFactory = new ConnectionFactory(configuration.connectionFactoryConfiguration(), listeners);
housekeepingExecutor = new PriorityScheduledExecutor(1, "xbib-" + HOUSEKEEP_COUNT.incrementAndGet(), listeners);
transactionIntegration = configuration.transactionIntegration();
borrowValidationEnabled = configuration.validateOnBorrow();
idleValidationEnabled = !configuration.validateOnBorrow() && !configuration.idleValidationTimeout().isZero();
leakEnabled = !configuration.leakTimeout().isZero();
validationEnabled = !configuration.validationTimeout().isZero();
reapEnabled = !configuration.reapTimeout().isZero();
}
public void init() {
if (configuration.acquisitionTimeout().compareTo(configuration.connectionFactoryConfiguration().loginTimeout()) < 0) {
fireOnWarning(listeners, "Login timeout should be smaller than acquisition timeout");
}
if (leakEnabled) {
housekeepingExecutor.schedule(new LeakTask(), configuration.leakTimeout().toNanos(), NANOSECONDS);
}
if (validationEnabled) {
housekeepingExecutor.schedule(new ValidationTask(), configuration.validationTimeout().toNanos(), NANOSECONDS);
}
if (reapEnabled) {
housekeepingExecutor.schedule(new ReapTask(), configuration.reapTimeout().toNanos(), NANOSECONDS);
}
transactionIntegration.addResourceRecoveryFactory(connectionFactory.hasRecoveryCredentials() ? connectionFactory : this);
// fill to the initial size
if (configuration.initialSize() < configuration.minSize()) {
fireOnInfo(listeners, "Initial size smaller than min. Connections will be created when necessary");
} else if (configuration.initialSize() > configuration.maxSize()) {
fireOnInfo(listeners, "Initial size bigger than max. Connections will be destroyed as soon as they return to the pool");
}
for (int n = configuration.initialSize(); n > 0; n--) {
housekeepingExecutor.executeNow(new CreateConnectionTask().initial());
}
}
public XbibConnectionPoolConfiguration getConfiguration() {
return configuration;
}
public int defaultJdbcIsolationLevel() {
return connectionFactory.defaultJdbcIsolationLevel();
}
public XbibDataSourceListener[] getListeners() {
return listeners;
}
public List<XbibPoolInterceptor> getPoolInterceptors() {
return unmodifiableList(interceptors);
}
public void setPoolInterceptors(Collection<? extends XbibPoolInterceptor> list) {
if (list.stream().anyMatch(i -> i.getPriority() < 0)) {
throw new IllegalArgumentException("Negative priority values on XbibPoolInterceptor are reserved.");
}
if (list.isEmpty() && (interceptors == null || interceptors.isEmpty())) {
return;
}
interceptors = list.stream().sorted(XbibPoolInterceptor.DEFAULT_COMPARATOR).collect(toList());
Function<XbibPoolInterceptor, String> interceptorName = i -> i.getClass().getName() + "@" + toHexString(identityHashCode(i)) + " (priority " + i.getPriority() + ")";
fireOnInfo(listeners, "Pool interceptors: " + interceptors.stream().map(interceptorName).collect(joining(" >>> ", "[", "]")));
}
public void flushPool(XbibDataSource.FlushMode mode) {
if (mode == LEAK && !leakEnabled) {
fireOnWarning(listeners, "Flushing leak connections with no specified leak timout.");
return;
}
housekeepingExecutor.execute(new FlushTask(mode));
}
@Override
public void close() {
transactionIntegration.removeResourceRecoveryFactory(connectionFactory.hasRecoveryCredentials() ? connectionFactory : this);
for (Runnable task : housekeepingExecutor.shutdownNow()) {
if (task instanceof DestroyConnectionTask) {
task.run();
}
}
for (ConnectionHandler handler : allConnections) {
handler.setState(FLUSH);
new DestroyConnectionTask(handler).run();
}
allConnections.clear();
activeCount.reset();
synchronizer.release(synchronizer.getQueueLength());
}
@Override
public boolean isRecoverable() {
return connectionFactory.isRecoverable();
}
@Override
public XAConnection getRecoveryConnection() throws SQLException {
long stamp = beforeAcquire();
checkMultipleAcquisition();
ConnectionHandler checkedOutHandler = null;
try {
do {
checkedOutHandler = (ConnectionHandler) localCache.get();
if (checkedOutHandler == null) {
checkedOutHandler = handlerFromSharedCache();
}
} while ((borrowValidationEnabled && !borrowValidation(checkedOutHandler))
|| (idleValidationEnabled && !idleValidation(checkedOutHandler)));
activeCount.increment();
fireOnConnectionAcquiredInterceptor(interceptors, checkedOutHandler);
afterAcquire(stamp, checkedOutHandler, false);
return checkedOutHandler.xaConnectionWrapper();
} catch (Throwable t) {
if (checkedOutHandler != null) {
checkedOutHandler.setState(CHECKED_OUT, CHECKED_IN);
}
throw t;
}
}
private long beforeAcquire() throws SQLException {
fireBeforeConnectionAcquire(listeners);
if (housekeepingExecutor.isShutdown()) {
throw new SQLException("This pool is closed and does not handle any more connections!");
}
return metricsRepository.beforeConnectionAcquire();
}
private void checkMultipleAcquisition() throws SQLException {
if (configuration.multipleAcquisition() != OFF) {
for (ConnectionHandler handler : allConnections) {
if (handler.getHoldingThread() == currentThread()) {
switch (configuration.multipleAcquisition()) {
case STRICT:
throw new SQLException("Acquisition of multiple connections by the same Thread.");
case WARN:
fireOnWarning(listeners, "Acquisition of multiple connections by the same Thread. This can lead to pool exhaustion and eventually a deadlock!");
}
break;
}
}
}
}
@Override
public Connection getConnection() throws SQLException {
long stamp = beforeAcquire();
ConnectionHandler checkedOutHandler = handlerFromTransaction();
if (checkedOutHandler != null) {
// AG-140 - If associate throws here is fine, it's assumed the synchronization that returns the connection has been registered
transactionIntegration.associate(checkedOutHandler, checkedOutHandler.getXaResource());
afterAcquire(stamp, checkedOutHandler, true);
return checkedOutHandler.connectionWrapper();
}
checkMultipleAcquisition();
try {
do {
checkedOutHandler = (ConnectionHandler) localCache.get();
if (checkedOutHandler == null) {
checkedOutHandler = handlerFromSharedCache();
}
} while ((borrowValidationEnabled && !borrowValidation(checkedOutHandler))
|| (idleValidationEnabled && !idleValidation(checkedOutHandler)));
transactionIntegration.associate(checkedOutHandler, checkedOutHandler.getXaResource());
activeCount.increment();
fireOnConnectionAcquiredInterceptor(interceptors, checkedOutHandler);
afterAcquire(stamp, checkedOutHandler, true);
return checkedOutHandler.connectionWrapper();
} catch (Throwable t) {
if (checkedOutHandler != null) {
// AG-140 - Return the connection to the pool to prevent leak
checkedOutHandler.setState(CHECKED_OUT, CHECKED_IN);
}
throw t;
}
}
private ConnectionHandler handlerFromTransaction() throws SQLException {
return (ConnectionHandler) transactionIntegration.getTransactionAware();
}
private ConnectionHandler handlerFromSharedCache() throws SQLException {
long remaining = configuration.acquisitionTimeout().toNanos();
remaining = remaining > 0 ? remaining : Long.MAX_VALUE;
Future<ConnectionHandler> task = null;
try {
for (; ; ) {
// If min-size increases, create a connection right away
if (allConnections.size() < configuration.minSize()) {
task = housekeepingExecutor.executeNow(new CreateConnectionTask());
}
// Try to find an available connection in the pool
for (ConnectionHandler handler : allConnections) {
if (handler.acquire()) {
return handler;
}
}
// If no connections are available and there is room, create one
if (task == null && allConnections.size() < configuration.maxSize()) {
task = housekeepingExecutor.executeNow(new CreateConnectionTask());
}
long start = nanoTime();
if (task == null) {
// Pool full, will have to wait for a connection to be returned
if (!synchronizer.tryAcquireNanos(synchronizer.getStamp(), remaining)) {
throw new SQLException("Sorry, acquisition timeout!");
}
} else {
// Wait for the new connection instead of the synchronizer to propagate any exception on connection establishment
ConnectionHandler handler = task.get(remaining, NANOSECONDS);
if (handler != null && handler.acquire()) {
return handler;
}
task = null;
}
remaining -= nanoTime() - start;
}
} catch (InterruptedException e) {
currentThread().interrupt();
throw new SQLException("Interrupted while acquiring");
} catch (ExecutionException e) {
throw unwrapExecutionException(e);
} catch (RejectedExecutionException | CancellationException e) {
throw new SQLException("Can't create new connection as the pool is shutting down", e);
} catch (TimeoutException e) {
task.cancel(true);
// AG-201: Last effort. Connections may have returned to the pool while waiting.
for (ConnectionHandler handler : allConnections) {
if (handler.acquire()) {
return handler;
}
}
throw new SQLException("Acquisition timeout while waiting for new connection", e);
}
}
private SQLException unwrapExecutionException(ExecutionException ee) {
try {
throw ee.getCause();
} catch (RuntimeException | Error re) {
throw re;
} catch (SQLException se) {
return se;
} catch (Throwable t) {
return new SQLException("Exception while creating new connection", t);
}
}
private boolean idleValidation(ConnectionHandler handler) {
if (!handler.isIdle(configuration.idleValidationTimeout())) {
return true;
}
return borrowValidation(handler);
}
private boolean borrowValidation(ConnectionHandler handler) {
if (handler.setState(CHECKED_OUT, VALIDATION)) {
return performValidation(handler, CHECKED_OUT);
}
return false;
}
private boolean performValidation(ConnectionHandler handler, ConnectionHandler.State targetState) {
fireBeforeConnectionValidation(listeners, handler);
if (handler.isValid() && handler.setState(VALIDATION, targetState)) {
fireOnConnectionValid(listeners, handler);
synchronizer.releaseConditional();
return true;
} else {
removeFromPool(handler);
metricsRepository.afterConnectionInvalid();
fireOnConnectionInvalid(listeners, handler);
return false;
}
}
private void afterAcquire(long metricsStamp, ConnectionHandler checkedOutHandler, boolean verifyEnlistment) throws SQLException {
metricsRepository.afterConnectionAcquire(metricsStamp);
fireOnConnectionAcquired(listeners, checkedOutHandler);
if (verifyEnlistment && !checkedOutHandler.isEnlisted()) {
switch (configuration.transactionRequirement()) {
case STRICT:
returnConnectionHandler(checkedOutHandler);
throw new SQLException("Connection acquired without transaction.");
case WARN:
fireOnWarning(listeners, new SQLException("Connection acquired without transaction."));
}
}
if (leakEnabled || reapEnabled) {
checkedOutHandler.touch();
}
if (leakEnabled || configuration.multipleAcquisition() != OFF) {
if (checkedOutHandler.getHoldingThread() != null && checkedOutHandler.getHoldingThread() != currentThread()) {
Throwable warn = new Throwable("Shared connection between threads '" + checkedOutHandler.getHoldingThread().getName() + "' and '" + currentThread().getName() + "'");
warn.setStackTrace(checkedOutHandler.getHoldingThread().getStackTrace());
fireOnWarning(listeners, warn);
}
checkedOutHandler.setHoldingThread(currentThread());
if (configuration.enhancedLeakReport()) {
checkedOutHandler.setAcquisitionStackTrace(currentThread().getStackTrace());
}
}
}
// --- //
public void returnConnectionHandler(ConnectionHandler handler) throws SQLException {
fireBeforeConnectionReturn(listeners, handler);
if (leakEnabled) {
handler.setHoldingThread(null);
if (configuration.enhancedLeakReport()) {
handler.setAcquisitionStackTrace(null);
}
}
if (idleValidationEnabled || reapEnabled) {
handler.touch();
}
try {
if (!transactionIntegration.disassociate(handler)) {
return;
}
} catch (Throwable ignored) {
}
activeCount.decrement();
// resize on change of max-size, or flush on close
int currentSize = allConnections.size();
if ((currentSize > configuration.maxSize() && currentSize > configuration.minSize()) || configuration.flushOnClose()) {
handler.setState(FLUSH);
removeFromPool(handler);
metricsRepository.afterConnectionReap();
fireOnConnectionReap(listeners, handler);
return;
}
try {
handler.resetConnection();
} catch (SQLException sqlException) {
fireOnWarning(listeners, sqlException);
}
localCache.put(handler);
fireOnConnectionReturnInterceptor(interceptors, handler);
if (handler.setState(CHECKED_OUT, CHECKED_IN)) {
// here the handler is already up for grabs
synchronizer.releaseConditional();
metricsRepository.afterConnectionReturn();
fireOnConnectionReturn(listeners, handler);
} else {
// handler not in CHECKED_OUT implies FLUSH
removeFromPool(handler);
metricsRepository.afterConnectionFlush();
fireOnConnectionFlush(listeners, handler);
}
}
private void removeFromPool(ConnectionHandler handler) {
allConnections.remove(handler);
synchronizer.releaseConditional();
housekeepingExecutor.execute(new FillTask());
housekeepingExecutor.execute(new DestroyConnectionTask(handler));
}
// --- Exposed statistics //
@Override
public void onMetricsEnabled(boolean metricsEnabled) {
metricsRepository = metricsEnabled ? new DefaultMetricsRepository(this) : new EmptyMetricsRepository();
}
public MetricsRepository getMetrics() {
return metricsRepository;
}
public long activeCount() {
return activeCount.sum();
}
public long availableCount() {
return allConnections.size() - activeCount.sum();
}
public long maxUsedCount() {
return maxUsed.get();
}
public void resetMaxUsedCount() {
maxUsed.reset();
}
public long awaitingCount() {