spring-data-redis LettuceConnectionFactory 源码
spring-data-redis LettuceConnectionFactory 代码
文件路径:/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java
/*
* Copyright 2011-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.redis.connection.lettuce;
import static org.springframework.data.redis.connection.lettuce.LettuceConnection.*;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.ClientOptions;
import io.lettuce.core.ReadFrom;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisConnectionException;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.cluster.ClusterClientOptions;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.resource.ClientResources;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.data.redis.ExceptionTranslationStrategy;
import org.springframework.data.redis.PassThroughExceptionTranslationStrategy;
import org.springframework.data.redis.RedisConnectionFailureException;
import org.springframework.data.redis.connection.*;
import org.springframework.data.redis.connection.RedisConfiguration.ClusterConfiguration;
import org.springframework.data.redis.connection.RedisConfiguration.DomainSocketConfiguration;
import org.springframework.data.redis.connection.RedisConfiguration.WithDatabaseIndex;
import org.springframework.data.redis.connection.RedisConfiguration.WithPassword;
import org.springframework.data.util.Optionals;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;
/**
* Connection factory creating <a href="https://github.com/mp911de/lettuce">Lettuce</a>-based connections.
* <p>
* This factory creates a new {@link LettuceConnection} on each call to {@link #getConnection()}. Multiple
* {@link LettuceConnection}s share a single thread-safe native connection by default.
* <p>
* The shared native connection is never closed by {@link LettuceConnection}, therefore it is not validated by default
* on {@link #getConnection()}. Use {@link #setValidateConnection(boolean)} to change this behavior if necessary. If
* {@code shareNativeConnection} is {@literal true}, a shared connection will be used for regular operations and a
* {@link LettuceConnectionProvider} will be used to select a connection for blocking and tx operations only, which
* should not share a connection. If native connection sharing is disabled, new (or pooled) connections will be used for
* all operations.
* <p>
* {@link LettuceConnectionFactory} should be configured using an environmental configuration and the
* {@link LettuceConnectionFactory client configuration}. Lettuce supports the following environmental configurations:
* <ul>
* <li>{@link RedisStandaloneConfiguration}</li>
* <li>{@link RedisStaticMasterReplicaConfiguration}</li>
* <li>{@link RedisSocketConfiguration}</li>
* <li>{@link RedisSentinelConfiguration}</li>
* <li>{@link RedisClusterConfiguration}</li>
* </ul>
* <p>
* This connection factory must be {@link #afterPropertiesSet() initialized} prior to {@link #getConnection obtaining
* connections}.
*
* @author Costin Leau
* @author Jennifer Hickey
* @author Thomas Darimont
* @author Christoph Strobl
* @author Mark Paluch
* @author Balázs Németh
* @author Ruben Cervilla
* @author Luis De Bello
* @author Andrea Como
* @author Chris Bono
*/
public class LettuceConnectionFactory
implements InitializingBean, DisposableBean, RedisConnectionFactory, ReactiveRedisConnectionFactory {
private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION = new PassThroughExceptionTranslationStrategy(
LettuceExceptionConverter.INSTANCE);
private final Log log = LogFactory.getLog(getClass());
private final LettuceClientConfiguration clientConfiguration;
private @Nullable AbstractRedisClient client;
private @Nullable LettuceConnectionProvider connectionProvider;
private @Nullable LettuceConnectionProvider reactiveConnectionProvider;
private boolean validateConnection = false;
private boolean shareNativeConnection = true;
private boolean eagerInitialization = false;
private @Nullable SharedConnection<byte[]> connection;
private @Nullable SharedConnection<ByteBuffer> reactiveConnection;
/** Synchronization monitor for the shared Connection */
private final Object connectionMonitor = new Object();
private boolean convertPipelineAndTxResults = true;
private RedisStandaloneConfiguration standaloneConfig = new RedisStandaloneConfiguration("localhost", 6379);
private PipeliningFlushPolicy pipeliningFlushPolicy = PipeliningFlushPolicy.flushEachCommand();
private @Nullable RedisConfiguration configuration;
private @Nullable ClusterCommandExecutor clusterCommandExecutor;
private boolean initialized;
private boolean destroyed;
/**
* Constructs a new {@link LettuceConnectionFactory} instance with default settings.
*/
public LettuceConnectionFactory() {
this(new MutableLettuceClientConfiguration());
}
/**
* Constructs a new {@link LettuceConnectionFactory} instance with default settings.
*/
public LettuceConnectionFactory(RedisStandaloneConfiguration configuration) {
this(configuration, new MutableLettuceClientConfiguration());
}
/**
* Constructs a new {@link LettuceConnectionFactory} instance given {@link LettuceClientConfiguration}.
*
* @param clientConfig must not be {@literal null}
* @since 2.0
*/
private LettuceConnectionFactory(LettuceClientConfiguration clientConfig) {
Assert.notNull(clientConfig, "LettuceClientConfiguration must not be null");
this.clientConfiguration = clientConfig;
this.configuration = this.standaloneConfig;
}
/**
* Constructs a new {@link LettuceConnectionFactory} instance with default settings.
*/
public LettuceConnectionFactory(String host, int port) {
this(new RedisStandaloneConfiguration(host, port), new MutableLettuceClientConfiguration());
}
/**
* Constructs a new {@link LettuceConnectionFactory} instance using the given {@link RedisSocketConfiguration}.
*
* @param redisConfiguration must not be {@literal null}.
* @since 2.1
*/
public LettuceConnectionFactory(RedisConfiguration redisConfiguration) {
this(redisConfiguration, new MutableLettuceClientConfiguration());
}
/**
* Constructs a new {@link LettuceConnectionFactory} instance using the given {@link RedisSentinelConfiguration}.
*
* @param sentinelConfiguration must not be {@literal null}.
* @since 1.6
*/
public LettuceConnectionFactory(RedisSentinelConfiguration sentinelConfiguration) {
this(sentinelConfiguration, new MutableLettuceClientConfiguration());
}
/**
* Constructs a new {@link LettuceConnectionFactory} instance using the given {@link RedisClusterConfiguration}
* applied to create a {@link RedisClusterClient}.
*
* @param clusterConfiguration must not be {@literal null}.
* @since 1.7
*/
public LettuceConnectionFactory(RedisClusterConfiguration clusterConfiguration) {
this(clusterConfiguration, new MutableLettuceClientConfiguration());
}
/**
* Constructs a new {@link LettuceConnectionFactory} instance using the given {@link RedisStandaloneConfiguration} and
* {@link LettuceClientConfiguration}.
*
* @param standaloneConfig must not be {@literal null}.
* @param clientConfig must not be {@literal null}.
* @since 2.0
*/
public LettuceConnectionFactory(RedisStandaloneConfiguration standaloneConfig,
LettuceClientConfiguration clientConfig) {
this(clientConfig);
Assert.notNull(standaloneConfig, "RedisStandaloneConfiguration must not be null");
this.standaloneConfig = standaloneConfig;
this.configuration = this.standaloneConfig;
}
/**
* Constructs a new {@link LettuceConnectionFactory} instance using the given
* {@link RedisStaticMasterReplicaConfiguration} and {@link LettuceClientConfiguration}.
*
* @param redisConfiguration must not be {@literal null}.
* @param clientConfig must not be {@literal null}.
* @since 2.1
*/
public LettuceConnectionFactory(RedisConfiguration redisConfiguration, LettuceClientConfiguration clientConfig) {
this(clientConfig);
Assert.notNull(redisConfiguration, "RedisConfiguration must not be null");
this.configuration = redisConfiguration;
}
/**
* Constructs a new {@link LettuceConnectionFactory} instance using the given {@link RedisSentinelConfiguration} and
* {@link LettuceClientConfiguration}.
*
* @param sentinelConfiguration must not be {@literal null}.
* @param clientConfig must not be {@literal null}.
* @since 2.0
*/
public LettuceConnectionFactory(RedisSentinelConfiguration sentinelConfiguration,
LettuceClientConfiguration clientConfig) {
this(clientConfig);
Assert.notNull(sentinelConfiguration, "RedisSentinelConfiguration must not be null");
this.configuration = sentinelConfiguration;
}
/**
* Constructs a new {@link LettuceConnectionFactory} instance using the given {@link RedisClusterConfiguration} and
* {@link LettuceClientConfiguration}.
*
* @param clusterConfiguration must not be {@literal null}.
* @param clientConfig must not be {@literal null}.
* @since 2.0
*/
public LettuceConnectionFactory(RedisClusterConfiguration clusterConfiguration,
LettuceClientConfiguration clientConfig) {
this(clientConfig);
Assert.notNull(clusterConfiguration, "RedisClusterConfiguration must not be null");
this.configuration = clusterConfiguration;
}
/**
* Creates a {@link RedisConfiguration} based on a {@link String URI} according to the following:
* <ul>
* <li>If {@code redisUri} contains sentinels, a {@link RedisSentinelConfiguration} is returned</li>
* <li>If {@code redisUri} has a configured socket a {@link RedisSocketConfiguration} is returned</li>
* <li>Otherwise a {@link RedisStandaloneConfiguration} is returned</li>
* </ul>
*
* @param redisUri the connection URI in the format of a {@link RedisURI}.
* @return an appropriate {@link RedisConfiguration} instance representing the Redis URI.
* @since 2.5.3
* @see RedisURI
*/
public static RedisConfiguration createRedisConfiguration(String redisUri) {
Assert.hasText(redisUri, "RedisURI must not be null and not empty");
return createRedisConfiguration(RedisURI.create(redisUri));
}
/**
* Creates a {@link RedisConfiguration} based on a {@link RedisURI} according to the following:
* <ul>
* <li>If {@link RedisURI} contains sentinels, a {@link RedisSentinelConfiguration} is returned</li>
* <li>If {@link RedisURI} has a configured socket a {@link RedisSocketConfiguration} is returned</li>
* <li>Otherwise a {@link RedisStandaloneConfiguration} is returned</li>
* </ul>
*
* @param redisUri the connection URI.
* @return an appropriate {@link RedisConfiguration} instance representing the Redis URI.
* @since 2.5.3
* @see RedisURI
*/
public static RedisConfiguration createRedisConfiguration(RedisURI redisUri) {
Assert.notNull(redisUri, "RedisURI must not be null");
if (!ObjectUtils.isEmpty(redisUri.getSentinels())) {
return LettuceConverters.createRedisSentinelConfiguration(redisUri);
}
if (!ObjectUtils.isEmpty(redisUri.getSocket())) {
return LettuceConverters.createRedisSocketConfiguration(redisUri);
}
return LettuceConverters.createRedisStandaloneConfiguration(redisUri);
}
public void afterPropertiesSet() {
this.client = createClient();
this.connectionProvider = new ExceptionTranslatingConnectionProvider(createConnectionProvider(client, CODEC));
this.reactiveConnectionProvider = new ExceptionTranslatingConnectionProvider(
createConnectionProvider(client, LettuceReactiveRedisConnection.CODEC));
if (isClusterAware()) {
this.clusterCommandExecutor = new ClusterCommandExecutor(
new LettuceClusterTopologyProvider((RedisClusterClient) client),
new LettuceClusterConnection.LettuceClusterNodeResourceProvider(this.connectionProvider),
EXCEPTION_TRANSLATION);
}
this.initialized = true;
if (getEagerInitialization() && getShareNativeConnection()) {
initConnection();
}
}
public void destroy() {
resetConnection();
if (clusterCommandExecutor != null) {
try {
clusterCommandExecutor.destroy();
} catch (Exception ex) {
log.warn("Cannot properly close cluster command executor", ex);
}
}
dispose(connectionProvider);
dispose(reactiveConnectionProvider);
try {
Duration quietPeriod = clientConfiguration.getShutdownQuietPeriod();
Duration timeout = clientConfiguration.getShutdownTimeout();
client.shutdown(quietPeriod.toMillis(), timeout.toMillis(), TimeUnit.MILLISECONDS);
} catch (Exception e) {
if (log.isWarnEnabled()) {
log.warn((client != null ? ClassUtils.getShortName(client.getClass()) : "LettuceClient")
+ " did not shut down gracefully.", e);
}
}
this.destroyed = true;
}
private void dispose(LettuceConnectionProvider connectionProvider) {
if (connectionProvider instanceof DisposableBean) {
try {
((DisposableBean) connectionProvider).destroy();
} catch (Exception e) {
if (log.isWarnEnabled()) {
log.warn(connectionProvider + " did not shut down gracefully.", e);
}
}
}
}
public RedisConnection getConnection() {
assertInitialized();
if (isClusterAware()) {
return getClusterConnection();
}
LettuceConnection connection = doCreateLettuceConnection(getSharedConnection(), connectionProvider, getTimeout(),
getDatabase());
connection.setConvertPipelineAndTxResults(convertPipelineAndTxResults);
return connection;
}
@Override
public RedisClusterConnection getClusterConnection() {
assertInitialized();
if (!isClusterAware()) {
throw new InvalidDataAccessApiUsageException("Cluster is not configured");
}
RedisClusterClient clusterClient = (RedisClusterClient) client;
StatefulRedisClusterConnection<byte[], byte[]> sharedConnection = getSharedClusterConnection();
LettuceClusterTopologyProvider topologyProvider = new LettuceClusterTopologyProvider(clusterClient);
return doCreateLettuceClusterConnection(sharedConnection, connectionProvider, topologyProvider,
clusterCommandExecutor, clientConfiguration.getCommandTimeout());
}
/**
* Customization hook for {@link LettuceConnection} creation.
*
* @param sharedConnection the shared {@link StatefulRedisConnection} if {@link #getShareNativeConnection()} is
* {@literal true}; {@literal null} otherwise.
* @param connectionProvider the {@link LettuceConnectionProvider} to release connections.
* @param timeout command timeout in {@link TimeUnit#MILLISECONDS}.
* @param database database index to operate on.
* @return the {@link LettuceConnection}.
* @throws IllegalArgumentException if a required parameter is {@literal null}.
* @since 2.2
*/
protected LettuceConnection doCreateLettuceConnection(
@Nullable StatefulRedisConnection<byte[], byte[]> sharedConnection, LettuceConnectionProvider connectionProvider,
long timeout, int database) {
LettuceConnection connection = new LettuceConnection(sharedConnection, connectionProvider, timeout, database);
connection.setPipeliningFlushPolicy(this.pipeliningFlushPolicy);
return connection;
}
/**
* Customization hook for {@link LettuceClusterConnection} creation.
*
* @param sharedConnection the shared {@link StatefulRedisConnection} if {@link #getShareNativeConnection()} is
* {@literal true}; {@literal null} otherwise.
* @param connectionProvider the {@link LettuceConnectionProvider} to release connections.
* @param topologyProvider the {@link ClusterTopologyProvider}.
* @param clusterCommandExecutor the {@link ClusterCommandExecutor} to release connections.
* @param commandTimeout command timeout {@link Duration}.
* @return the {@link LettuceConnection}.
* @throws IllegalArgumentException if a required parameter is {@literal null}.
* @since 2.2
*/
protected LettuceClusterConnection doCreateLettuceClusterConnection(
@Nullable StatefulRedisClusterConnection<byte[], byte[]> sharedConnection,
LettuceConnectionProvider connectionProvider, ClusterTopologyProvider topologyProvider,
ClusterCommandExecutor clusterCommandExecutor, Duration commandTimeout) {
LettuceClusterConnection connection = new LettuceClusterConnection(sharedConnection, connectionProvider,
topologyProvider, clusterCommandExecutor, commandTimeout);
connection.setPipeliningFlushPolicy(this.pipeliningFlushPolicy);
return connection;
}
@Override
public LettuceReactiveRedisConnection getReactiveConnection() {
assertInitialized();
if (isClusterAware()) {
return getReactiveClusterConnection();
}
return getShareNativeConnection()
? new LettuceReactiveRedisConnection(getSharedReactiveConnection(), reactiveConnectionProvider)
: new LettuceReactiveRedisConnection(reactiveConnectionProvider);
}
@Override
public LettuceReactiveRedisClusterConnection getReactiveClusterConnection() {
assertInitialized();
if (!isClusterAware()) {
throw new InvalidDataAccessApiUsageException("Cluster is not configured");
}
RedisClusterClient client = (RedisClusterClient) this.client;
return getShareNativeConnection()
? new LettuceReactiveRedisClusterConnection(getSharedReactiveConnection(), reactiveConnectionProvider, client)
: new LettuceReactiveRedisClusterConnection(reactiveConnectionProvider, client);
}
/**
* Initialize the shared connection if {@link #getShareNativeConnection() native connection sharing} is enabled and
* reset any previously existing connection.
*/
public void initConnection() {
resetConnection();
if (isClusterAware()) {
getSharedClusterConnection();
} else {
getSharedConnection();
}
getSharedReactiveConnection();
}
/**
* Reset the underlying shared Connection, to be reinitialized on next access.
*/
public void resetConnection() {
assertInitialized();
Optionals.toStream(Optional.ofNullable(connection), Optional.ofNullable(reactiveConnection))
.forEach(SharedConnection::resetConnection);
synchronized (this.connectionMonitor) {
this.connection = null;
this.reactiveConnection = null;
}
}
/**
* Validate the shared connections and reinitialize if invalid.
*/
public void validateConnection() {
assertInitialized();
getOrCreateSharedConnection().validateConnection();
getOrCreateSharedReactiveConnection().validateConnection();
}
private SharedConnection<byte[]> getOrCreateSharedConnection() {
synchronized (this.connectionMonitor) {
if (this.connection == null) {
this.connection = new SharedConnection<>(connectionProvider);
}
return this.connection;
}
}
private SharedConnection<ByteBuffer> getOrCreateSharedReactiveConnection() {
synchronized (this.connectionMonitor) {
if (this.reactiveConnection == null) {
this.reactiveConnection = new SharedConnection<>(reactiveConnectionProvider);
}
return this.reactiveConnection;
}
}
public DataAccessException translateExceptionIfPossible(RuntimeException ex) {
return EXCEPTION_TRANSLATION.translate(ex);
}
/**
* Returns the current host.
*
* @return the host.
*/
public String getHostName() {
return RedisConfiguration.getHostOrElse(configuration, standaloneConfig::getHostName);
}
/**
* Sets the hostname.
*
* @param hostName the hostname to set.
* @deprecated since 2.0, configure the hostname using {@link RedisStandaloneConfiguration}.
*/
@Deprecated
public void setHostName(String hostName) {
standaloneConfig.setHostName(hostName);
}
/**
* Returns the current port.
*
* @return the port.
*/
public int getPort() {
return RedisConfiguration.getPortOrElse(configuration, standaloneConfig::getPort);
}
/**
* Sets the port.
*
* @param port the port to set.
* @deprecated since 2.0, configure the port using {@link RedisStandaloneConfiguration}.
*/
@Deprecated
public void setPort(int port) {
standaloneConfig.setPort(port);
}
/**
* Configures the flushing policy when using pipelining. If not set, defaults to
* {@link PipeliningFlushPolicy#flushEachCommand() flush on each command}.
*
* @param pipeliningFlushPolicy the flushing policy to control when commands get written to the Redis connection.
* @see LettuceConnection#openPipeline()
* @see StatefulRedisConnection#flushCommands()
* @since 2.3
*/
public void setPipeliningFlushPolicy(PipeliningFlushPolicy pipeliningFlushPolicy) {
Assert.notNull(pipeliningFlushPolicy, "PipeliningFlushingPolicy must not be null");
this.pipeliningFlushPolicy = pipeliningFlushPolicy;
}
/**
* Returns the connection timeout (in milliseconds).
*
* @return connection timeout.
*/
public long getTimeout() {
return getClientTimeout();
}
/**
* Sets the connection timeout (in milliseconds).
*
* @param timeout the timeout.
* @deprecated since 2.0, configure the timeout using {@link LettuceClientConfiguration}.
* @throws IllegalStateException if {@link LettuceClientConfiguration} is immutable.
*/
@Deprecated
public void setTimeout(long timeout) {
getMutableConfiguration().setTimeout(Duration.ofMillis(timeout));
}
/**
* Returns whether to use SSL.
*
* @return use of SSL.
*/
public boolean isUseSsl() {
return clientConfiguration.isUseSsl();
}
/**
* Sets to use SSL connection.
*
* @param useSsl {@literal true} to use SSL.
* @deprecated since 2.0, configure SSL usage using {@link LettuceClientConfiguration}.
* @throws IllegalStateException if {@link LettuceClientConfiguration} is immutable.
*/
@Deprecated
public void setUseSsl(boolean useSsl) {
getMutableConfiguration().setUseSsl(useSsl);
}
/**
* Returns whether to verify certificate validity/hostname check when SSL is used.
*
* @return whether to verify peers when using SSL.
*/
public boolean isVerifyPeer() {
return clientConfiguration.isVerifyPeer();
}
/**
* Sets to use verify certificate validity/hostname check when SSL is used.
*
* @param verifyPeer {@literal false} not to verify hostname.
* @deprecated since 2.0, configure peer verification using {@link LettuceClientConfiguration}.
* @throws IllegalStateException if {@link LettuceClientConfiguration} is immutable.
*/
@Deprecated
public void setVerifyPeer(boolean verifyPeer) {
getMutableConfiguration().setVerifyPeer(verifyPeer);
}
/**
* Returns whether to issue a StartTLS.
*
* @return use of StartTLS.
*/
public boolean isStartTls() {
return clientConfiguration.isStartTls();
}
/**
* Sets to issue StartTLS.
*
* @param startTls {@literal true} to issue StartTLS.
* @deprecated since 2.0, configure StartTLS using {@link LettuceClientConfiguration}.
* @throws IllegalStateException if {@link LettuceClientConfiguration} is immutable.
*/
@Deprecated
public void setStartTls(boolean startTls) {
getMutableConfiguration().setStartTls(startTls);
}
/**
* Indicates if validation of the native Lettuce connection is enabled.
*
* @return connection validation enabled.
*/
public boolean getValidateConnection() {
return validateConnection;
}
/**
* Enables validation of the shared native Lettuce connection on calls to {@link #getConnection()}. A new connection
* will be created and used if validation fails.
* <p>
* Lettuce will automatically reconnect until close is called, which should never happen through
* {@link LettuceConnection} if a shared native connection is used, therefore the default is {@literal false}.
* <p>
* Setting this to {@literal true} will result in a round-trip call to the server on each new connection, so this
* setting should only be used if connection sharing is enabled and there is code that is actively closing the native
* Lettuce connection.
*
* @param validateConnection enable connection validation.
*/
public void setValidateConnection(boolean validateConnection) {
this.validateConnection = validateConnection;
}
/**
* Indicates if multiple {@link LettuceConnection}s should share a single native connection.
*
* @return native connection shared.
*/
public boolean getShareNativeConnection() {
return shareNativeConnection;
}
/**
* Enables multiple {@link LettuceConnection}s to share a single native connection. If set to {@literal false}, every
* operation on {@link LettuceConnection} will open and close a socket.
*
* @param shareNativeConnection enable connection sharing.
*/
public void setShareNativeConnection(boolean shareNativeConnection) {
this.shareNativeConnection = shareNativeConnection;
}
/**
* Indicates {@link #setShareNativeConnection(boolean) shared connections} should be eagerly initialized. Eager
* initialization requires a running Redis instance during application startup to allow early validation of connection
* factory configuration. Eager initialization also prevents blocking connect while using reactive API and is
* recommended for reactive API usage.
*
* @return {@link true} if the shared connection is initialized upon {@link #afterPropertiesSet()}.
* @since 2.2
*/
public boolean getEagerInitialization() {
return eagerInitialization;
}
/**
* Enables eager initialization of {@link #setShareNativeConnection(boolean) shared connections}.
*
* @param eagerInitialization enable eager connection shared connection initialization upon
* {@link #afterPropertiesSet()}.
* @since 2.2
*/
public void setEagerInitialization(boolean eagerInitialization) {
this.eagerInitialization = eagerInitialization;
}
/**
* Returns the index of the database.
*
* @return the database index.
*/
public int getDatabase() {
return RedisConfiguration.getDatabaseOrElse(configuration, standaloneConfig::getDatabase);
}
/**
* Sets the index of the database used by this connection factory. Default is 0.
*
* @param index database index
*/
public void setDatabase(int index) {
Assert.isTrue(index >= 0, "invalid DB index (a positive index required)");
if (RedisConfiguration.isDatabaseIndexAware(configuration)) {
((WithDatabaseIndex) configuration).setDatabase(index);
return;
}
standaloneConfig.setDatabase(index);
}
/**
* Returns the client name.
*
* @return the client name or {@literal null} if not set.
* @since 2.1
*/
@Nullable
public String getClientName() {
return clientConfiguration.getClientName().orElse(null);
}
/**
* Sets the client name used by this connection factory.
*
* @param clientName the client name. Can be {@literal null}.
* @since 2.1
* @deprecated configure the client name using {@link LettuceClientConfiguration}.
* @throws IllegalStateException if {@link LettuceClientConfiguration} is immutable.
*/
@Deprecated
public void setClientName(@Nullable String clientName) {
this.getMutableConfiguration().setClientName(clientName);
}
/**
* Returns the native {@link AbstractRedisClient} used by this instance. The client is initialized as part of
* {@link #afterPropertiesSet() the bean initialization lifecycle} and only available when this connection factory is
* initialized.
* <p>
* Depending on the configuration, the client can be either {@link RedisClient} or {@link RedisClusterClient}.
*
* @return the native {@link AbstractRedisClient}. Can be {@literal null} if not initialized.
* @since 2.5
* @see #afterPropertiesSet()
*/
@Nullable
public AbstractRedisClient getNativeClient() {
assertInitialized();
return this.client;
}
/**
* Returns the native {@link AbstractRedisClient} used by this instance. The client is initialized as part of
* {@link #afterPropertiesSet() the bean initialization lifecycle} and only available when this connection factory is
* initialized. Throws {@link IllegalStateException} if not yet initialized.
* <p>
* Depending on the configuration, the client can be either {@link RedisClient} or {@link RedisClusterClient}.
*
* @return the native {@link AbstractRedisClient}.
* @since 2.5
* @throws IllegalStateException if not yet initialized.
* @see #getNativeClient()
*/
public AbstractRedisClient getRequiredNativeClient() {
AbstractRedisClient client = getNativeClient();
Assert.state(client != null, "Client not yet initialized; Did you forget to call initialize the bean");
return client;
}
@Nullable
private String getRedisUsername() {
return RedisConfiguration.getUsernameOrElse(configuration, standaloneConfig::getUsername);
}
/**
* Returns the password used for authenticating with the Redis server.
*
* @return password for authentication or {@literal null} if not set.
*/
@Nullable
public String getPassword() {
return getRedisPassword().map(String::new).orElse(null);
}
private RedisPassword getRedisPassword() {
return RedisConfiguration.getPasswordOrElse(configuration, standaloneConfig::getPassword);
}
/**
* Sets the password used for authenticating with the Redis server.
*
* @param password the password to set
* @deprecated since 2.0, configure the password using {@link RedisStandaloneConfiguration},
* {@link RedisSentinelConfiguration} or {@link RedisClusterConfiguration}.
*/
@Deprecated
public void setPassword(String password) {
if (RedisConfiguration.isAuthenticationAware(configuration)) {
((WithPassword) configuration).setPassword(password);
return;
}
standaloneConfig.setPassword(RedisPassword.of(password));
}
/**
* Returns the shutdown timeout for shutting down the RedisClient (in milliseconds).
*
* @return shutdown timeout.
* @since 1.6
*/
public long getShutdownTimeout() {
return clientConfiguration.getShutdownTimeout().toMillis();
}
/**
* Sets the shutdown timeout for shutting down the RedisClient (in milliseconds).
*
* @param shutdownTimeout the shutdown timeout.
* @since 1.6
* @deprecated since 2.0, configure the shutdown timeout using {@link LettuceClientConfiguration}.
* @throws IllegalStateException if {@link LettuceClientConfiguration} is immutable.
*/
@Deprecated
public void setShutdownTimeout(long shutdownTimeout) {
getMutableConfiguration().setShutdownTimeout(Duration.ofMillis(shutdownTimeout));
}
/**
* Get the {@link ClientResources} to reuse infrastructure.
*
* @return {@literal null} if not set.
* @since 1.7
*/
public ClientResources getClientResources() {
return clientConfiguration.getClientResources().orElse(null);
}
/**
* Sets the {@link ClientResources} to reuse the client infrastructure. <br />
* Set to {@literal null} to not share resources.
*
* @param clientResources can be {@literal null}.
* @since 1.7
* @deprecated since 2.0, configure {@link ClientResources} using {@link LettuceClientConfiguration}.
* @throws IllegalStateException if {@link LettuceClientConfiguration} is immutable.
*/
@Deprecated
public void setClientResources(ClientResources clientResources) {
getMutableConfiguration().setClientResources(clientResources);
}
/**
* @return the {@link LettuceClientConfiguration}.
* @since 2.0
*/
public LettuceClientConfiguration getClientConfiguration() {
return clientConfiguration;
}
/**
* @return the {@link RedisStandaloneConfiguration}.
* @since 2.0
*/
public RedisStandaloneConfiguration getStandaloneConfiguration() {
return standaloneConfig;
}
/**
* @return the {@link RedisSocketConfiguration} or {@literal null} if not set.
* @since 2.1
*/
@Nullable
public RedisSocketConfiguration getSocketConfiguration() {
return isDomainSocketAware() ? (RedisSocketConfiguration) configuration : null;
}
/**
* @return the {@link RedisSentinelConfiguration}, may be {@literal null}.
* @since 2.0
*/
@Nullable
public RedisSentinelConfiguration getSentinelConfiguration() {
return isRedisSentinelAware() ? (RedisSentinelConfiguration) configuration : null;
}
/**
* @return the {@link RedisClusterConfiguration}, may be {@literal null}.
* @since 2.0
*/
@Nullable
public RedisClusterConfiguration getClusterConfiguration() {
return isClusterAware() ? (RedisClusterConfiguration) configuration : null;
}
/**
* Specifies if pipelined results should be converted to the expected data type. If false, results of
* {@link LettuceConnection#closePipeline()} and {LettuceConnection#exec()} will be of the type returned by the
* Lettuce driver.
*
* @return Whether or not to convert pipeline and tx results.
*/
public boolean getConvertPipelineAndTxResults() {
return convertPipelineAndTxResults;
}
/**
* Specifies if pipelined and transaction results should be converted to the expected data type. If false, results of
* {@link LettuceConnection#closePipeline()} and {LettuceConnection#exec()} will be of the type returned by the
* Lettuce driver.
*
* @param convertPipelineAndTxResults Whether or not to convert pipeline and tx results.
*/
public void setConvertPipelineAndTxResults(boolean convertPipelineAndTxResults) {
this.convertPipelineAndTxResults = convertPipelineAndTxResults;
}
/**
* @return true when {@link RedisStaticMasterReplicaConfiguration} is present.
* @since 2.1
*/
private boolean isStaticMasterReplicaAware() {
return RedisConfiguration.isStaticMasterReplicaConfiguration(configuration);
}
/**
* @return true when {@link RedisSentinelConfiguration} is present.
* @since 1.5
*/
public boolean isRedisSentinelAware() {
return RedisConfiguration.isSentinelConfiguration(configuration);
}
/**
* @return true when {@link RedisSocketConfiguration} is present.
* @since 2.1
*/
private boolean isDomainSocketAware() {
return RedisConfiguration.isDomainSocketConfiguration(configuration);
}
/**
* @return true when {@link RedisClusterConfiguration} is present.
* @since 1.7
*/
public boolean isClusterAware() {
return RedisConfiguration.isClusterConfiguration(configuration);
}
/**
* @return the shared connection using {@literal byte[]} encoding for imperative API use. {@literal null} if
* {@link #getShareNativeConnection() connection sharing} is disabled or when connected to Redis Cluster.
*/
@Nullable
protected StatefulRedisConnection<byte[], byte[]> getSharedConnection() {
return shareNativeConnection && !isClusterAware()
? (StatefulRedisConnection) getOrCreateSharedConnection().getConnection()
: null;
}
/**
* @return the shared cluster connection using {@literal byte[]} encoding for imperative API use. {@literal null} if
* {@link #getShareNativeConnection() connection sharing} is disabled or when connected to Redis
* Standalone/Sentinel/Master-Replica.
* @since 2.5.7
*/
@Nullable
protected StatefulRedisClusterConnection<byte[], byte[]> getSharedClusterConnection() {
return shareNativeConnection && isClusterAware()
? (StatefulRedisClusterConnection) getOrCreateSharedConnection().getConnection()
: null;
}
/**
* @return the shared connection using {@link ByteBuffer} encoding for reactive API use. {@literal null} if
* {@link #getShareNativeConnection() connection sharing} is disabled.
* @since 2.0.1
*/
@Nullable
protected StatefulConnection<ByteBuffer, ByteBuffer> getSharedReactiveConnection() {
return shareNativeConnection ? getOrCreateSharedReactiveConnection().getConnection() : null;
}
private LettuceConnectionProvider createConnectionProvider(AbstractRedisClient client, RedisCodec<?, ?> codec) {
LettuceConnectionProvider connectionProvider = doCreateConnectionProvider(client, codec);
if (this.clientConfiguration instanceof LettucePoolingClientConfiguration) {
return new LettucePoolingConnectionProvider(connectionProvider,
(LettucePoolingClientConfiguration) this.clientConfiguration);
}
return connectionProvider;
}
/**
* Create a {@link LettuceConnectionProvider} given {@link AbstractRedisClient} and {@link RedisCodec}. Configuration
* of this connection factory specifies the type of the created connection provider. This method creates either a
* {@link LettuceConnectionProvider} for either {@link RedisClient} or {@link RedisClusterClient}. Subclasses may
* override this method to decorate the connection provider.
*
* @param client either {@link RedisClient} or {@link RedisClusterClient}, must not be {@literal null}.
* @param codec used for connection creation, must not be {@literal null}. By default, a {@code byte[]} codec.
* Reactive connections require a {@link java.nio.ByteBuffer} codec.
* @return the connection provider.
* @since 2.1
*/
protected LettuceConnectionProvider doCreateConnectionProvider(AbstractRedisClient client, RedisCodec<?, ?> codec) {
ReadFrom readFrom = getClientConfiguration().getReadFrom().orElse(null);
if (isStaticMasterReplicaAware()) {
List<RedisURI> nodes = ((RedisStaticMasterReplicaConfiguration) configuration).getNodes().stream() //
.map(it -> createRedisURIAndApplySettings(it.getHostName(), it.getPort())) //
.peek(it -> it.setDatabase(getDatabase())) //
.collect(Collectors.toList());
return new StaticMasterReplicaConnectionProvider((RedisClient) client, codec, nodes, readFrom);
}
if (isClusterAware()) {
return new ClusterConnectionProvider((RedisClusterClient) client, codec, readFrom);
}
return new StandaloneConnectionProvider((RedisClient) client, codec, readFrom);
}
protected AbstractRedisClient createClient() {
if (isStaticMasterReplicaAware()) {
RedisClient redisClient = clientConfiguration.getClientResources() //
.map(RedisClient::create) //
.orElseGet(RedisClient::create);
clientConfiguration.getClientOptions().ifPresent(redisClient::setOptions);
return redisClient;
}
if (isRedisSentinelAware()) {
RedisURI redisURI = getSentinelRedisURI();
RedisClient redisClient = clientConfiguration.getClientResources() //
.map(clientResources -> RedisClient.create(clientResources, redisURI)) //
.orElseGet(() -> RedisClient.create(redisURI));
clientConfiguration.getClientOptions().ifPresent(redisClient::setOptions);
return redisClient;
}
if (isClusterAware()) {
List<RedisURI> initialUris = new ArrayList<>();
ClusterConfiguration configuration = (ClusterConfiguration) this.configuration;
for (RedisNode node : configuration.getClusterNodes()) {
initialUris.add(createRedisURIAndApplySettings(node.getHost(), node.getPort()));
}
RedisClusterClient clusterClient = clientConfiguration.getClientResources() //
.map(clientResources -> RedisClusterClient.create(clientResources, initialUris)) //
.orElseGet(() -> RedisClusterClient.create(initialUris));
clusterClient.setOptions(getClusterClientOptions(configuration));
return clusterClient;
}
RedisURI uri = isDomainSocketAware()
? createRedisSocketURIAndApplySettings(((DomainSocketConfiguration) configuration).getSocket())
: createRedisURIAndApplySettings(getHostName(), getPort());
RedisClient redisClient = clientConfiguration.getClientResources() //
.map(clientResources -> RedisClient.create(clientResources, uri)) //
.orElseGet(() -> RedisClient.create(uri));
clientConfiguration.getClientOptions().ifPresent(redisClient::setOptions);
return redisClient;
}
private ClusterClientOptions getClusterClientOptions(ClusterConfiguration configuration) {
Optional<ClientOptions> clientOptions = clientConfiguration.getClientOptions();
ClusterClientOptions clusterClientOptions = clientOptions //
.filter(ClusterClientOptions.class::isInstance) //
.map(ClusterClientOptions.class::cast) //
.orElseGet(() -> {
return clientOptions //
.map(it -> ClusterClientOptions.builder(it).build()) //
.orElseGet(ClusterClientOptions::create);
});
if (configuration.getMaxRedirects() != null) {
return clusterClientOptions.mutate().maxRedirects(configuration.getMaxRedirects()).build();
}
return clusterClientOptions;
}
private RedisURI getSentinelRedisURI() {
RedisURI redisUri = LettuceConverters.sentinelConfigurationToRedisURI(
(org.springframework.data.redis.connection.RedisSentinelConfiguration) configuration);
applyToAll(redisUri, it -> {
clientConfiguration.getClientName().ifPresent(it::setClientName);
it.setSsl(clientConfiguration.isUseSsl());
it.setVerifyPeer(clientConfiguration.isVerifyPeer());
it.setStartTls(clientConfiguration.isStartTls());
it.setTimeout(clientConfiguration.getCommandTimeout());
});
redisUri.setDatabase(getDatabase());
return redisUri;
}
private void assertInitialized() {
Assert.state(this.initialized, "LettuceConnectionFactory was not initialized through afterPropertiesSet()");
Assert.state(!this.destroyed, "LettuceConnectionFactory was destroyed and cannot be used anymore");
}
private static void applyToAll(RedisURI source, Consumer<RedisURI> action) {
action.accept(source);
source.getSentinels().forEach(action);
}
private RedisURI createRedisURIAndApplySettings(String host, int port) {
RedisURI.Builder builder = RedisURI.Builder.redis(host, port);
applyAuthentication(builder);
clientConfiguration.getClientName().ifPresent(builder::withClientName);
builder.withDatabase(getDatabase());
builder.withSsl(clientConfiguration.isUseSsl());
builder.withVerifyPeer(clientConfiguration.isVerifyPeer());
builder.withStartTls(clientConfiguration.isStartTls());
builder.withTimeout(clientConfiguration.getCommandTimeout());
return builder.build();
}
private RedisURI createRedisSocketURIAndApplySettings(String socketPath) {
RedisURI.Builder builder = RedisURI.Builder.socket(socketPath);
applyAuthentication(builder);
builder.withDatabase(getDatabase());
builder.withTimeout(clientConfiguration.getCommandTimeout());
return builder.build();
}
private void applyAuthentication(RedisURI.Builder builder) {
String username = getRedisUsername();
if (StringUtils.hasText(username)) {
// See https://github.com/lettuce-io/lettuce-core/issues/1404
builder.withAuthentication(username, new String(getRedisPassword().toOptional().orElse(new char[0])));
} else {
getRedisPassword().toOptional().ifPresent(builder::withPassword);
}
}
@Override
public RedisSentinelConnection getSentinelConnection() {
assertInitialized();
return new LettuceSentinelConnection(connectionProvider);
}
private MutableLettuceClientConfiguration getMutableConfiguration() {
Assert.state(clientConfiguration instanceof MutableLettuceClientConfiguration,
() -> String.format("Client configuration must be instance of MutableLettuceClientConfiguration but is %s",
ClassUtils.getShortName(clientConfiguration.getClass())));
return (MutableLettuceClientConfiguration) clientConfiguration;
}
private long getClientTimeout() {
return clientConfiguration.getCommandTimeout().toMillis();
}
/**
* Wrapper for shared connections. Keeps track of the connection lifecycleThe wrapper is thread-safe as it
* synchronizes concurrent calls by blocking.
*
* @param <E> connection encoding.
* @author Mark Paluch
* @author Christoph Strobl
* @since 2.1
*/
class SharedConnection<E> {
private final LettuceConnectionProvider connectionProvider;
/** Synchronization monitor for the shared Connection */
private final Object connectionMonitor = new Object();
private @Nullable StatefulConnection<E, E> connection;
SharedConnection(LettuceConnectionProvider connectionProvider) {
this.connectionProvider = connectionProvider;
}
/**
* Returns a valid Lettuce connection. Initializes and validates the connection if
* {@link #setValidateConnection(boolean) enabled}.
*
* @return the connection.
*/
@Nullable
StatefulConnection<E, E> getConnection() {
synchronized (this.connectionMonitor) {
if (this.connection == null) {
this.connection = getNativeConnection();
}
if (getValidateConnection()) {
validateConnection();
}
return this.connection;
}
}
/**
* Obtain a connection from the associated {@link LettuceConnectionProvider}.
*
* @return the connection.
*/
private StatefulConnection<E, E> getNativeConnection() {
return connectionProvider.getConnection(StatefulConnection.class);
}
/**
* Validate the connection. Invalid connections will be closed and the connection state will be reset.
*/
void validateConnection() {
synchronized (this.connectionMonitor) {
boolean valid = false;
if (connection != null && connection.isOpen()) {
try {
if (connection instanceof StatefulRedisConnection) {
((StatefulRedisConnection) connection).sync().ping();
}
if (connection instanceof StatefulRedisClusterConnection) {
((StatefulRedisClusterConnection) connection).sync().ping();
}
valid = true;
} catch (Exception e) {
log.debug("Validation failed", e);
}
}
if (!valid) {
log.info("Validation of shared connection failed; Creating a new connection.");
resetConnection();
this.connection = getNativeConnection();
}
}
}
/**
* Reset the underlying shared Connection, to be reinitialized on next access.
*/
void resetConnection() {
synchronized (this.connectionMonitor) {
if (this.connection != null) {
this.connectionProvider.release(this.connection);
}
this.connection = null;
}
}
}
/**
* Mutable implementation of {@link LettuceClientConfiguration}.
*
* @author Mark Paluch
* @author Christoph Strobl
*/
static class MutableLettuceClientConfiguration implements LettuceClientConfiguration {
private boolean useSsl;
private boolean verifyPeer = true;
private boolean startTls;
private @Nullable ClientResources clientResources;
private @Nullable String clientName;
private Duration timeout = Duration.ofSeconds(RedisURI.DEFAULT_TIMEOUT);
private Duration shutdownTimeout = Duration.ofMillis(100);
@Override
public boolean isUseSsl() {
return useSsl;
}
void setUseSsl(boolean useSsl) {
this.useSsl = useSsl;
}
@Override
public boolean isVerifyPeer() {
return verifyPeer;
}
void setVerifyPeer(boolean verifyPeer) {
this.verifyPeer = verifyPeer;
}
@Override
public boolean isStartTls() {
return startTls;
}
void setStartTls(boolean startTls) {
this.startTls = startTls;
}
@Override
public Optional<ClientResources> getClientResources() {
return Optional.ofNullable(clientResources);
}
void setClientResources(ClientResources clientResources) {
this.clientResources = clientResources;
}
@Override
public Optional<ClientOptions> getClientOptions() {
return Optional.empty();
}
@Override
public Optional<ReadFrom> getReadFrom() {
return Optional.empty();
}
@Override
public Optional<String> getClientName() {
return Optional.ofNullable(clientName);
}
/**
* @param clientName can be {@literal null}.
* @since 2.1
*/
void setClientName(@Nullable String clientName) {
this.clientName = clientName;
}
@Override
public Duration getCommandTimeout() {
return timeout;
}
void setTimeout(Duration timeout) {
this.timeout = timeout;
}
@Override
public Duration getShutdownTimeout() {
return shutdownTimeout;
}
void setShutdownTimeout(Duration shutdownTimeout) {
this.shutdownTimeout = shutdownTimeout;
}
@Override
public Duration getShutdownQuietPeriod() {
return shutdownTimeout;
}
}
/**
* {@link LettuceConnectionProvider} that translates connection exceptions into {@link RedisConnectionException}.
*/
private static class ExceptionTranslatingConnectionProvider
implements LettuceConnectionProvider, LettuceConnectionProvider.TargetAware, DisposableBean {
private final LettuceConnectionProvider delegate;
public ExceptionTranslatingConnectionProvider(LettuceConnectionProvider delegate) {
this.delegate = delegate;
}
@Override
public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) {
try {
return delegate.getConnection(connectionType);
} catch (RuntimeException e) {
throw translateException(e);
}
}
@Override
public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType, RedisURI redisURI) {
try {
return ((TargetAware) delegate).getConnection(connectionType, redisURI);
} catch (RuntimeException e) {
throw translateException(e);
}
}
@Override
public <T extends StatefulConnection<?, ?>> CompletionStage<T> getConnectionAsync(Class<T> connectionType) {
CompletableFuture<T> future = new CompletableFuture<>();
delegate.getConnectionAsync(connectionType).whenComplete((t, throwable) -> {
if (throwable != null) {
future.completeExceptionally(translateException(throwable));
} else {
future.complete(t);
}
});
return future;
}
@Override
public <T extends StatefulConnection<?, ?>> CompletionStage<T> getConnectionAsync(Class<T> connectionType,
RedisURI redisURI) {
CompletableFuture<T> future = new CompletableFuture<>();
((TargetAware) delegate).getConnectionAsync(connectionType, redisURI).whenComplete((t, throwable) -> {
if (throwable != null) {
future.completeExceptionally(translateException(throwable));
} else {
future.complete(t);
}
});
return future;
}
@Override
public void release(StatefulConnection<?, ?> connection) {
delegate.release(connection);
}
@Override
public CompletableFuture<Void> releaseAsync(StatefulConnection<?, ?> connection) {
return delegate.releaseAsync(connection);
}
@Override
public void destroy() throws Exception {
if (delegate instanceof DisposableBean) {
((DisposableBean) delegate).destroy();
}
}
private RuntimeException translateException(Throwable e) {
return e instanceof RedisConnectionFailureException ? (RedisConnectionFailureException) e
: new RedisConnectionFailureException("Unable to connect to Redis", e);
}
}
}
相关信息
相关文章
spring-data-redis ClusterConnectionProvider 源码
spring-data-redis DefaultLettuceClientConfiguration 源码
spring-data-redis DefaultLettucePoolingClientConfiguration 源码
spring-data-redis LettuceByteBufferPubSubListenerWrapper 源码
spring-data-redis LettuceClientConfiguration 源码
spring-data-redis LettuceClusterConnection 源码
spring-data-redis LettuceClusterGeoCommands 源码
spring-data-redis LettuceClusterHashCommands 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦