spring-data-redis LettucePoolingConnectionProvider 源码
spring-data-redis LettucePoolingConnectionProvider 代码
文件路径:/src/main/java/org/springframework/data/redis/connection/lettuce/LettucePoolingConnectionProvider.java
/*
* Copyright 2017-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.redis.connection.lettuce;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.support.AsyncConnectionPoolSupport;
import io.lettuce.core.support.AsyncPool;
import io.lettuce.core.support.BoundedPoolConfig;
import io.lettuce.core.support.CommonsPool2ConfigConverter;
import io.lettuce.core.support.ConnectionPoolSupport;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.data.redis.connection.PoolException;
import org.springframework.util.Assert;
/**
* {@link LettuceConnectionProvider} with connection pooling support. This connection provider holds multiple pools (one
* per connection type and allocation type (synchronous/asynchronous)) for contextualized connection allocation.
* <p>
* Each allocated connection is tracked and to be returned into the pool which created the connection. Instances of this
* class require {@link #destroy() disposal} to de-allocate lingering connections that were not returned to the pool and
* to close the pools.
* <p>
* This provider maintains separate pools due to the allocation nature (synchronous/asynchronous). Asynchronous
* connection pooling requires a non-blocking allocation API. Connections requested asynchronously can be returned
* synchronously and vice versa. A connection obtained synchronously is returned to the synchronous pool even if
* {@link #releaseAsync(StatefulConnection) released asynchronously}. This is an undesired case as the synchronous pool
* will block the asynchronous flow for the time of release.
*
* @author Mark Paluch
* @author Christoph Strobl
* @since 2.0
* @see #getConnection(Class)
*/
class LettucePoolingConnectionProvider implements LettuceConnectionProvider, RedisClientProvider, DisposableBean {
private final static Log log = LogFactory.getLog(LettucePoolingConnectionProvider.class);
private final LettuceConnectionProvider connectionProvider;
private final GenericObjectPoolConfig poolConfig;
private final Map<StatefulConnection<?, ?>, GenericObjectPool<StatefulConnection<?, ?>>> poolRef = new ConcurrentHashMap<>(
32);
private final Map<StatefulConnection<?, ?>, AsyncPool<StatefulConnection<?, ?>>> asyncPoolRef = new ConcurrentHashMap<>(
32);
private final Map<CompletableFuture<StatefulConnection<?, ?>>, AsyncPool<StatefulConnection<?, ?>>> inProgressAsyncPoolRef = new ConcurrentHashMap<>(
32);
private final Map<Class<?>, GenericObjectPool<StatefulConnection<?, ?>>> pools = new ConcurrentHashMap<>(32);
private final Map<Class<?>, AsyncPool<StatefulConnection<?, ?>>> asyncPools = new ConcurrentHashMap<>(32);
private final BoundedPoolConfig asyncPoolConfig;
LettucePoolingConnectionProvider(LettuceConnectionProvider connectionProvider,
LettucePoolingClientConfiguration clientConfiguration) {
Assert.notNull(connectionProvider, "ConnectionProvider must not be null");
Assert.notNull(clientConfiguration, "ClientConfiguration must not be null");
this.connectionProvider = connectionProvider;
this.poolConfig = clientConfiguration.getPoolConfig();
this.asyncPoolConfig = CommonsPool2ConfigConverter.bounded(this.poolConfig);
}
@Override
public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) {
GenericObjectPool<StatefulConnection<?, ?>> pool = pools.computeIfAbsent(connectionType, poolType -> {
return ConnectionPoolSupport.createGenericObjectPool(() -> connectionProvider.getConnection(connectionType),
poolConfig, false);
});
try {
StatefulConnection<?, ?> connection = pool.borrowObject();
poolRef.put(connection, pool);
return connectionType.cast(connection);
} catch (Exception e) {
throw new PoolException("Could not get a resource from the pool", e);
}
}
@Override
public <T extends StatefulConnection<?, ?>> CompletionStage<T> getConnectionAsync(Class<T> connectionType) {
AsyncPool<StatefulConnection<?, ?>> pool = asyncPools.computeIfAbsent(connectionType, poolType -> {
return AsyncConnectionPoolSupport.createBoundedObjectPool(
() -> connectionProvider.getConnectionAsync(connectionType).thenApply(connectionType::cast), asyncPoolConfig,
false);
});
CompletableFuture<StatefulConnection<?, ?>> acquire = pool.acquire();
inProgressAsyncPoolRef.put(acquire, pool);
return acquire.whenComplete((connection, e) -> {
inProgressAsyncPoolRef.remove(acquire);
if (connection != null) {
asyncPoolRef.put(connection, pool);
}
}).thenApply(connectionType::cast);
}
@Override
public AbstractRedisClient getRedisClient() {
if (connectionProvider instanceof RedisClientProvider) {
return ((RedisClientProvider) connectionProvider).getRedisClient();
}
throw new IllegalStateException(
String.format("Underlying connection provider %s does not implement RedisClientProvider",
connectionProvider.getClass().getName()));
}
@Override
public void release(StatefulConnection<?, ?> connection) {
GenericObjectPool<StatefulConnection<?, ?>> pool = poolRef.remove(connection);
if (pool == null) {
AsyncPool<StatefulConnection<?, ?>> asyncPool = asyncPoolRef.remove(connection);
if (asyncPool == null) {
throw new PoolException("Returned connection " + connection
+ " was either previously returned or does not belong to this connection provider");
}
discardIfNecessary(connection);
asyncPool.release(connection).join();
return;
}
discardIfNecessary(connection);
pool.returnObject(connection);
}
private void discardIfNecessary(StatefulConnection<?, ?> connection) {
if (connection instanceof StatefulRedisConnection) {
StatefulRedisConnection<?, ?> redisConnection = (StatefulRedisConnection<?, ?>) connection;
if (redisConnection.isMulti()) {
redisConnection.async().discard();
}
}
}
@Override
public CompletableFuture<Void> releaseAsync(StatefulConnection<?, ?> connection) {
GenericObjectPool<StatefulConnection<?, ?>> blockingPool = poolRef.remove(connection);
if (blockingPool != null) {
log.warn("Releasing asynchronously a connection that was obtained from a non-blocking pool");
blockingPool.returnObject(connection);
return CompletableFuture.completedFuture(null);
}
AsyncPool<StatefulConnection<?, ?>> pool = asyncPoolRef.remove(connection);
if (pool == null) {
return LettuceFutureUtils.failed(new PoolException("Returned connection " + connection
+ " was either previously returned or does not belong to this connection provider"));
}
return pool.release(connection);
}
@Override
public void destroy() throws Exception {
List<CompletableFuture<?>> futures = new ArrayList<>();
if (!poolRef.isEmpty() || !asyncPoolRef.isEmpty()) {
log.warn("LettucePoolingConnectionProvider contains unreleased connections");
}
if (!inProgressAsyncPoolRef.isEmpty()) {
log.warn("LettucePoolingConnectionProvider has active connection retrievals");
inProgressAsyncPoolRef.forEach((k, v) -> futures.add(k.thenApply(StatefulConnection::closeAsync)));
}
if (!poolRef.isEmpty()) {
poolRef.forEach((connection, pool) -> pool.returnObject(connection));
poolRef.clear();
}
if (!asyncPoolRef.isEmpty()) {
asyncPoolRef.forEach((connection, pool) -> futures.add(pool.release(connection)));
asyncPoolRef.clear();
}
pools.forEach((type, pool) -> pool.close());
CompletableFuture
.allOf(futures.stream().map(it -> it.exceptionally(LettuceFutureUtils.ignoreErrors()))
.toArray(CompletableFuture[]::new)) //
.thenCompose(ignored -> {
CompletableFuture[] poolClose = asyncPools.values().stream().map(AsyncPool::closeAsync)
.map(it -> it.exceptionally(LettuceFutureUtils.ignoreErrors())).toArray(CompletableFuture[]::new);
return CompletableFuture.allOf(poolClose);
}) //
.thenRun(() -> {
asyncPoolRef.clear();
inProgressAsyncPoolRef.clear();
}) //
.join();
pools.clear();
}
}
相关信息
相关文章
spring-data-redis ClusterConnectionProvider 源码
spring-data-redis DefaultLettuceClientConfiguration 源码
spring-data-redis DefaultLettucePoolingClientConfiguration 源码
spring-data-redis LettuceByteBufferPubSubListenerWrapper 源码
spring-data-redis LettuceClientConfiguration 源码
spring-data-redis LettuceClusterConnection 源码
spring-data-redis LettuceClusterGeoCommands 源码
spring-data-redis LettuceClusterHashCommands 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦