spring-batch ExtendedConnectionDataSourceProxy 源码
spring-batch ExtendedConnectionDataSourceProxy 代码
文件路径:/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/database/ExtendedConnectionDataSourceProxy.java
/*
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.item.database;
import java.io.PrintWriter;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.util.logging.Logger;
import javax.sql.DataSource;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.jdbc.datasource.ConnectionProxy;
import org.springframework.jdbc.datasource.DataSourceUtils;
import org.springframework.jdbc.datasource.SmartDataSource;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;
import org.springframework.util.MethodInvoker;
/**
* Implementation of {@link SmartDataSource} that is capable of keeping a single JDBC
* Connection which is NOT closed after each use even if {@link Connection#close()} is
* called.
*
* The connection can be kept open over multiple transactions when used together with any
* of Spring's {@link org.springframework.transaction.PlatformTransactionManager}
* implementations.
*
* <p>
* Loosely based on the SingleConnectionDataSource implementation in Spring Core. Intended
* to be used with the {@link JdbcCursorItemReader} to provide a connection that remains
* open across transaction boundaries, It remains open for the life of the cursor, and can
* be shared with the main transaction of the rest of the step processing.
*
* <p>
* Once close suppression has been turned on for a connection, it will be returned for the
* first {@link #getConnection()} call. Any subsequent calls to {@link #getConnection()}
* will retrieve a new connection from the wrapped {@link DataSource} until the
* {@link DataSourceUtils} queries whether the connection should be closed or not by
* calling {@link #shouldClose(Connection)} for the close-suppressed {@link Connection}.
* At that point the cycle starts over again, and the next {@link #getConnection()} call
* will have the {@link Connection} that is being close-suppressed returned. This allows
* the use of the close-suppressed {@link Connection} to be the main {@link Connection}
* for an extended data access process. The close suppression is turned off by calling
* {@link #stopCloseSuppression(Connection)}.
*
* <p>
* This class is not multi-threading capable.
*
* <p>
* The connection returned will be a close-suppressing proxy instead of the physical
* {@link Connection}. Be aware that you will not be able to cast this to a native
* <code>OracleConnection</code> or the like anymore; you'd be required to use
* {@link java.sql.Connection#unwrap(Class)}.
*
* @author Thomas Risberg
* @see #getConnection()
* @see java.sql.Connection#close()
* @see DataSourceUtils#releaseConnection
* @see java.sql.Connection#unwrap(Class)
* @since 2.0
*/
public class ExtendedConnectionDataSourceProxy implements SmartDataSource, InitializingBean {
/** Provided DataSource */
private DataSource dataSource;
/** The connection to suppress close calls for */
private Connection closeSuppressedConnection = null;
/** The connection to suppress close calls for */
private boolean borrowedConnection = false;
/** Synchronization monitor for the shared Connection */
private final Object connectionMonitor = new Object();
/**
* No arg constructor for use when configured using JavaBean style.
*/
public ExtendedConnectionDataSourceProxy() {
}
/**
* Constructor that takes as a parameter with the {@link DataSource} to be wrapped.
* @param dataSource DataSource to be used
*/
public ExtendedConnectionDataSourceProxy(DataSource dataSource) {
this.dataSource = dataSource;
}
/**
* Setter for the {@link DataSource} that is to be wrapped.
* @param dataSource the DataSource
*/
public void setDataSource(DataSource dataSource) {
this.dataSource = dataSource;
}
/**
* @see SmartDataSource
*/
@Override
public boolean shouldClose(Connection connection) {
boolean shouldClose = !isCloseSuppressionActive(connection);
if (borrowedConnection && closeSuppressedConnection.equals(connection)) {
borrowedConnection = false;
}
return shouldClose;
}
/**
* Return the status of close suppression being activated for a given
* {@link Connection}
* @param connection the {@link Connection} that the close suppression status is
* requested for
* @return true or false
*/
public boolean isCloseSuppressionActive(Connection connection) {
return connection != null && connection.equals(closeSuppressedConnection);
}
/**
* @param connection the {@link Connection} that close suppression is requested for
*/
public void startCloseSuppression(Connection connection) {
synchronized (this.connectionMonitor) {
closeSuppressedConnection = connection;
if (TransactionSynchronizationManager.isActualTransactionActive()) {
borrowedConnection = true;
}
}
}
/**
* @param connection the {@link Connection} that close suppression should be turned
* off for
*/
public void stopCloseSuppression(Connection connection) {
synchronized (this.connectionMonitor) {
closeSuppressedConnection = null;
borrowedConnection = false;
}
}
@Override
public Connection getConnection() throws SQLException {
synchronized (this.connectionMonitor) {
return initConnection(null, null);
}
}
@Override
public Connection getConnection(String username, String password) throws SQLException {
synchronized (this.connectionMonitor) {
return initConnection(username, password);
}
}
private boolean completeCloseCall(Connection connection) {
if (borrowedConnection && closeSuppressedConnection.equals(connection)) {
borrowedConnection = false;
}
return isCloseSuppressionActive(connection);
}
private Connection initConnection(String username, String password) throws SQLException {
if (closeSuppressedConnection != null) {
if (!borrowedConnection) {
borrowedConnection = true;
return closeSuppressedConnection;
}
}
Connection target;
if (username != null) {
target = dataSource.getConnection(username, password);
}
else {
target = dataSource.getConnection();
}
return getCloseSuppressingConnectionProxy(target);
}
@Override
public PrintWriter getLogWriter() throws SQLException {
return dataSource.getLogWriter();
}
@Override
public int getLoginTimeout() throws SQLException {
return dataSource.getLoginTimeout();
}
@Override
public void setLogWriter(PrintWriter out) throws SQLException {
dataSource.setLogWriter(out);
}
@Override
public void setLoginTimeout(int seconds) throws SQLException {
dataSource.setLoginTimeout(seconds);
}
/**
* Wrap the given Connection with a proxy that delegates every method call to it but
* suppresses close calls.
* @param target the original Connection to wrap
* @return the wrapped Connection
*/
protected Connection getCloseSuppressingConnectionProxy(Connection target) {
return (Connection) Proxy.newProxyInstance(ConnectionProxy.class.getClassLoader(),
new Class[] { ConnectionProxy.class }, new CloseSuppressingInvocationHandler(target, this));
}
/**
* Invocation handler that suppresses close calls on JDBC Connections until the
* associated instance of the ExtendedConnectionDataSourceProxy determines the
* connection should actually be closed.
*/
private static class CloseSuppressingInvocationHandler implements InvocationHandler {
private final Connection target;
private final ExtendedConnectionDataSourceProxy dataSource;
public CloseSuppressingInvocationHandler(Connection target, ExtendedConnectionDataSourceProxy dataSource) {
this.dataSource = dataSource;
this.target = target;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// Invocation on ConnectionProxy interface coming in...
switch (method.getName()) {
case "equals":
// Only consider equal when proxies are identical.
return (proxy == args[0] ? Boolean.TRUE : Boolean.FALSE);
case "hashCode":
// Use hashCode of Connection proxy.
return System.identityHashCode(proxy);
case "close":
// Handle close method: don't pass the call on if we are
// suppressing close calls.
if (dataSource.completeCloseCall((Connection) proxy)) {
return null;
}
else {
target.close();
return null;
}
case "getTargetConnection":
// Handle getTargetConnection method: return underlying
// Connection.
return this.target;
}
// Invoke method on target Connection.
try {
return method.invoke(this.target, args);
}
catch (InvocationTargetException ex) {
throw ex.getTargetException();
}
}
}
/**
* Performs only a 'shallow' non-recursive check of self's and delegate's class to
* retain Java 5 compatibility.
*/
@Override
public boolean isWrapperFor(Class<?> iface) throws SQLException {
return iface.isAssignableFrom(SmartDataSource.class) || iface.isAssignableFrom(dataSource.getClass());
}
/**
* Returns either self or delegate (in this order) if one of them can be cast to
* supplied parameter class. Does *not* support recursive unwrapping of the delegate
* to retain Java 5 compatibility.
*/
@Override
public <T> T unwrap(Class<T> iface) throws SQLException {
if (iface.isAssignableFrom(SmartDataSource.class)) {
@SuppressWarnings("unchecked")
T casted = (T) this;
return casted;
}
else if (iface.isAssignableFrom(dataSource.getClass())) {
@SuppressWarnings("unchecked")
T casted = (T) dataSource;
return casted;
}
throw new SQLException("Unsupported class " + iface.getSimpleName());
}
@Override
public void afterPropertiesSet() throws Exception {
Assert.notNull(dataSource, "DataSource is required");
}
/**
* Added due to JDK 7 compatibility.
*/
public Logger getParentLogger() throws SQLFeatureNotSupportedException {
MethodInvoker invoker = new MethodInvoker();
invoker.setTargetObject(dataSource);
invoker.setTargetMethod("getParentLogger");
try {
invoker.prepare();
return (Logger) invoker.invoke();
}
catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException
| InvocationTargetException nsme) {
throw new SQLFeatureNotSupportedException(nsme);
}
}
}
相关信息
相关文章
spring-batch AbstractCursorItemReader 源码
spring-batch AbstractPagingItemReader 源码
spring-batch BeanPropertyItemSqlParameterSourceProvider 源码
spring-batch HibernateCursorItemReader 源码
spring-batch HibernateItemReaderHelper 源码
spring-batch HibernateItemWriter 源码
spring-batch HibernatePagingItemReader 源码
spring-batch ItemPreparedStatementSetter 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦