spring-data-redis LettuceConnection 源码
spring-data-redis LettuceConnection 代码
文件路径:/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceConnection.java
/*
* Copyright 2011-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.redis.connection.lettuce;
import static io.lettuce.core.protocol.CommandType.*;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.LettuceFutures;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.RedisURI;
import io.lettuce.core.TransactionResult;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.async.RedisAsyncCommands;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.async.RedisClusterAsyncCommands;
import io.lettuce.core.cluster.api.sync.RedisClusterCommands;
import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.output.*;
import io.lettuce.core.protocol.Command;
import io.lettuce.core.protocol.CommandArgs;
import io.lettuce.core.protocol.CommandType;
import io.lettuce.core.protocol.ProtocolKeyword;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.sentinel.api.StatefulRedisSentinelConnection;
import java.lang.reflect.Constructor;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.core.convert.converter.Converter;
import org.springframework.dao.DataAccessException;
import org.springframework.dao.InvalidDataAccessApiUsageException;
import org.springframework.dao.QueryTimeoutException;
import org.springframework.data.redis.ExceptionTranslationStrategy;
import org.springframework.data.redis.FallbackExceptionTranslationStrategy;
import org.springframework.data.redis.RedisSystemException;
import org.springframework.data.redis.connection.*;
import org.springframework.data.redis.connection.convert.TransactionResultConverter;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionProvider.TargetAware;
import org.springframework.data.redis.connection.lettuce.LettuceResult.LettuceResultBuilder;
import org.springframework.data.redis.connection.lettuce.LettuceResult.LettuceStatusResult;
import org.springframework.data.redis.core.RedisCommand;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.ObjectUtils;
/**
* {@code RedisConnection} implementation on top of <a href="https://github.com/mp911de/lettuce">Lettuce</a> Redis
* client.
*
* @author Costin Leau
* @author Jennifer Hickey
* @author Christoph Strobl
* @author Thomas Darimont
* @author David Liu
* @author Mark Paluch
* @author Ninad Divadkar
* @author Tamil Selvan
* @author ihaohong
*/
public class LettuceConnection extends AbstractRedisConnection {
private final Log LOGGER = LogFactory.getLog(getClass());
static final RedisCodec<byte[], byte[]> CODEC = ByteArrayCodec.INSTANCE;
private static final ExceptionTranslationStrategy EXCEPTION_TRANSLATION = new FallbackExceptionTranslationStrategy(
LettuceExceptionConverter.INSTANCE);
private static final TypeHints typeHints = new TypeHints();
private final LettuceGeoCommands geoCommands = new LettuceGeoCommands(this);
private final LettuceHashCommands hashCommands = new LettuceHashCommands(this);
private final LettuceHyperLogLogCommands hllCommands = new LettuceHyperLogLogCommands(this);
private final LettuceKeyCommands keyCommands = new LettuceKeyCommands(this);
private final LettuceListCommands listCommands = new LettuceListCommands(this);
private final LettuceScriptingCommands scriptingCommands = new LettuceScriptingCommands(this);
private final LettuceSetCommands setCommands = new LettuceSetCommands(this);
private final LettuceServerCommands serverCommands = new LettuceServerCommands(this);
private final LettuceStreamCommands streamCommands = new LettuceStreamCommands(this);
private final LettuceStringCommands stringCommands = new LettuceStringCommands(this);
private final LettuceZSetCommands zSetCommands = new LettuceZSetCommands(this);
private final int defaultDbIndex;
private int dbIndex;
private final LettuceConnectionProvider connectionProvider;
private final @Nullable StatefulConnection<byte[], byte[]> asyncSharedConn;
private @Nullable StatefulConnection<byte[], byte[]> asyncDedicatedConn;
private final long timeout;
// refers only to main connection as pubsub happens on a different one
private boolean isClosed = false;
private boolean isMulti = false;
private boolean isPipelined = false;
private @Nullable List<LettuceResult<?, ?>> ppline;
private @Nullable PipeliningFlushState flushState;
private final Queue<FutureResult<?>> txResults = new LinkedList<>();
private volatile @Nullable LettuceSubscription subscription;
/** flag indicating whether the connection needs to be dropped or not */
private boolean convertPipelineAndTxResults = true;
private PipeliningFlushPolicy pipeliningFlushPolicy = PipeliningFlushPolicy.flushEachCommand();
LettuceResult<?, ?> newLettuceResult(Future<?> resultHolder) {
return newLettuceResult(resultHolder, (val) -> val);
}
<T, R> LettuceResult<T, R> newLettuceResult(Future<T> resultHolder, Converter<T, R> converter) {
return LettuceResultBuilder.<T, R> forResponse(resultHolder).mappedWith(converter)
.convertPipelineAndTxResults(convertPipelineAndTxResults).build();
}
<T, R> LettuceResult<T, R> newLettuceResult(Future<T> resultHolder, Converter<T, R> converter,
Supplier<R> defaultValue) {
return LettuceResultBuilder.<T, R> forResponse(resultHolder).mappedWith(converter)
.convertPipelineAndTxResults(convertPipelineAndTxResults).defaultNullTo(defaultValue).build();
}
<T, R> LettuceResult<T, R> newLettuceStatusResult(Future<T> resultHolder) {
return LettuceResultBuilder.<T, R> forResponse(resultHolder).buildStatusResult();
}
private class LettuceTransactionResultConverter<T> extends TransactionResultConverter<T> {
public LettuceTransactionResultConverter(Queue<FutureResult<T>> txResults,
Converter<Exception, DataAccessException> exceptionConverter) {
super(txResults, exceptionConverter);
}
@Override
public List<Object> convert(List<Object> execResults) {
// Lettuce Empty list means null (watched variable modified)
if (execResults.isEmpty()) {
return null;
}
return super.convert(execResults);
}
}
/**
* Instantiates a new lettuce connection.
*
* @param timeout The connection timeout (in milliseconds)
* @param client The {@link RedisClient} to use when instantiating a native connection
*/
public LettuceConnection(long timeout, RedisClient client) {
this(null, timeout, client);
}
/**
* Instantiates a new lettuce connection.
*
* @param sharedConnection A native connection that is shared with other {@link LettuceConnection}s. Will not be used
* for transactions or blocking operations
* @param timeout The connection timeout (in milliseconds)
* @param client The {@link RedisClient} to use when making pub/sub, blocking, and tx connections
*/
public LettuceConnection(@Nullable StatefulRedisConnection<byte[], byte[]> sharedConnection, long timeout,
RedisClient client) {
this(sharedConnection, timeout, client, 0);
}
/**
* @param sharedConnection A native connection that is shared with other {@link LettuceConnection}s. Should not be
* used for transactions or blocking operations.
* @param timeout The connection timeout (in milliseconds)
* @param client The {@link RedisClient} to use when making pub/sub connections.
* @param defaultDbIndex The db index to use along with {@link RedisClient} when establishing a dedicated connection.
* @since 1.7
*/
public LettuceConnection(@Nullable StatefulRedisConnection<byte[], byte[]> sharedConnection, long timeout,
@Nullable AbstractRedisClient client, int defaultDbIndex) {
this.connectionProvider = new StandaloneConnectionProvider((RedisClient) client, CODEC);
this.asyncSharedConn = sharedConnection;
this.timeout = timeout;
this.defaultDbIndex = defaultDbIndex;
this.dbIndex = this.defaultDbIndex;
}
/**
* @param sharedConnection A native connection that is shared with other {@link LettuceConnection}s. Should not be
* used for transactions or blocking operations.
* @param connectionProvider connection provider to obtain and release native connections.
* @param timeout The connection timeout (in milliseconds)
* @param defaultDbIndex The db index to use along with {@link RedisClient} when establishing a dedicated connection.
* @since 2.0
*/
public LettuceConnection(@Nullable StatefulRedisConnection<byte[], byte[]> sharedConnection,
LettuceConnectionProvider connectionProvider, long timeout, int defaultDbIndex) {
this((StatefulConnection<byte[], byte[]>) sharedConnection, connectionProvider, timeout, defaultDbIndex);
}
/**
* @param sharedConnection A native connection that is shared with other {@link LettuceConnection}s. Should not be
* used for transactions or blocking operations.
* @param connectionProvider connection provider to obtain and release native connections.
* @param timeout The connection timeout (in milliseconds)
* @param defaultDbIndex The db index to use along with {@link RedisClient} when establishing a dedicated connection.
* @since 2.1
*/
LettuceConnection(@Nullable StatefulConnection<byte[], byte[]> sharedConnection,
LettuceConnectionProvider connectionProvider, long timeout, int defaultDbIndex) {
Assert.notNull(connectionProvider, "LettuceConnectionProvider must not be null");
this.asyncSharedConn = sharedConnection;
this.connectionProvider = connectionProvider;
this.timeout = timeout;
this.defaultDbIndex = defaultDbIndex;
this.dbIndex = this.defaultDbIndex;
}
protected DataAccessException convertLettuceAccessException(Exception ex) {
return EXCEPTION_TRANSLATION.translate(ex);
}
@Override
public org.springframework.data.redis.connection.RedisCommands commands() {
return this;
}
@Override
public RedisGeoCommands geoCommands() {
return geoCommands;
}
@Override
public RedisHashCommands hashCommands() {
return hashCommands;
}
@Override
public RedisHyperLogLogCommands hyperLogLogCommands() {
return hllCommands;
}
@Override
public RedisKeyCommands keyCommands() {
return keyCommands;
}
@Override
public RedisListCommands listCommands() {
return listCommands;
}
@Override
public RedisScriptingCommands scriptingCommands() {
return scriptingCommands;
}
@Override
public RedisSetCommands setCommands() {
return setCommands;
}
@Override
public RedisServerCommands serverCommands() {
return serverCommands;
}
@Override
public RedisStreamCommands streamCommands() {
return streamCommands;
}
@Override
public RedisStringCommands stringCommands() {
return stringCommands;
}
@Override
public RedisZSetCommands zSetCommands() {
return zSetCommands;
}
@Override
public Object execute(String command, byte[]... args) {
return execute(command, null, args);
}
/**
* 'Native' or 'raw' execution of the given command along-side the given arguments.
*
* @see RedisConnection#execute(String, byte[]...)
* @param command Command to execute
* @param commandOutputTypeHint Type of Output to use, may be (may be {@literal null}).
* @param args Possible command arguments (may be {@literal null})
* @return execution result.
*/
@Nullable
@SuppressWarnings({ "rawtypes", "unchecked" })
public Object execute(String command, @Nullable CommandOutput commandOutputTypeHint, byte[]... args) {
Assert.hasText(command, "a valid command needs to be specified");
String name = command.trim().toUpperCase();
ProtocolKeyword commandType = getCommandType(name);
validateCommandIfRunningInTransactionMode(commandType, args);
CommandArgs<byte[], byte[]> cmdArg = new CommandArgs<>(CODEC);
if (!ObjectUtils.isEmpty(args)) {
cmdArg.addKeys(args);
}
CommandOutput expectedOutput = commandOutputTypeHint != null ? commandOutputTypeHint
: typeHints.getTypeHint(commandType);
Command cmd = new Command(commandType, expectedOutput, cmdArg);
return invoke().just(RedisClusterAsyncCommands::dispatch, cmd.getType(), cmd.getOutput(), cmd.getArgs());
}
@Override
public void close() {
super.close();
if (isClosed) {
return;
}
isClosed = true;
try {
reset();
} catch (RuntimeException e) {
LOGGER.debug("Failed to reset connection during close", e);
}
}
private void reset() {
if (asyncDedicatedConn != null) {
try {
if (customizedDatabaseIndex()) {
potentiallySelectDatabase(defaultDbIndex);
}
connectionProvider.release(asyncDedicatedConn);
asyncDedicatedConn = null;
} catch (RuntimeException ex) {
throw convertLettuceAccessException(ex);
}
}
LettuceSubscription subscription = this.subscription;
if (subscription != null) {
if (subscription.isAlive()) {
subscription.doClose();
}
this.subscription = null;
}
this.dbIndex = defaultDbIndex;
}
@Override
public boolean isClosed() {
return isClosed && !isSubscribed();
}
@Override
public RedisClusterAsyncCommands<byte[], byte[]> getNativeConnection() {
LettuceSubscription subscription = this.subscription;
return (subscription != null && subscription.isAlive() ? subscription.getNativeConnection().async()
: getAsyncConnection());
}
@Override
public boolean isQueueing() {
return isMulti;
}
@Override
public boolean isPipelined() {
return isPipelined;
}
@Override
public void openPipeline() {
if (!isPipelined) {
isPipelined = true;
ppline = new ArrayList<>();
flushState = this.pipeliningFlushPolicy.newPipeline();
flushState.onOpen(this.getOrCreateDedicatedConnection());
}
}
@Override
public List<Object> closePipeline() {
if (!isPipelined) {
return Collections.emptyList();
}
flushState.onClose(this.getOrCreateDedicatedConnection());
flushState = null;
isPipelined = false;
List<io.lettuce.core.protocol.RedisCommand<?, ?, ?>> futures = new ArrayList<>(ppline.size());
for (LettuceResult<?, ?> result : ppline) {
futures.add(result.getResultHolder());
}
try {
boolean done = LettuceFutures.awaitAll(timeout, TimeUnit.MILLISECONDS,
futures.toArray(new RedisFuture[futures.size()]));
List<Object> results = new ArrayList<>(futures.size());
Exception problem = null;
if (done) {
for (LettuceResult<?, ?> result : ppline) {
if (result.getResultHolder().getOutput().hasError()) {
Exception err = new InvalidDataAccessApiUsageException(result.getResultHolder().getOutput().getError());
// remember only the first error
if (problem == null) {
problem = err;
}
results.add(err);
} else if (!result.isStatus()) {
try {
results.add(result.conversionRequired() ? result.convert(result.get()) : result.get());
} catch (DataAccessException e) {
if (problem == null) {
problem = e;
}
results.add(e);
}
}
}
}
ppline.clear();
if (problem != null) {
throw new RedisPipelineException(problem, results);
}
if (done) {
return results;
}
throw new RedisPipelineException(new QueryTimeoutException("Redis command timed out"));
} catch (Exception e) {
throw new RedisPipelineException(e);
}
}
@Override
public byte[] echo(byte[] message) {
return invoke().just(RedisClusterAsyncCommands::echo, message);
}
@Override
public String ping() {
return invoke().just(RedisClusterAsyncCommands::ping);
}
@Override
public void discard() {
isMulti = false;
try {
if (isPipelined()) {
pipeline(newLettuceStatusResult(getAsyncDedicatedRedisCommands().discard()));
return;
}
getDedicatedRedisCommands().discard();
} catch (Exception ex) {
throw convertLettuceAccessException(ex);
} finally {
txResults.clear();
}
}
@Override
@SuppressWarnings({ "unchecked", "rawtypes" })
public List<Object> exec() {
isMulti = false;
try {
Converter<Exception, DataAccessException> exceptionConverter = this::convertLettuceAccessException;
if (isPipelined()) {
RedisFuture<TransactionResult> exec = getAsyncDedicatedRedisCommands().exec();
LettuceTransactionResultConverter resultConverter = new LettuceTransactionResultConverter(
new LinkedList<>(txResults), exceptionConverter);
pipeline(newLettuceResult(exec,
source -> resultConverter.convert(LettuceConverters.transactionResultUnwrapper().convert(source))));
return null;
}
TransactionResult transactionResult = getDedicatedRedisCommands().exec();
List<Object> results = LettuceConverters.transactionResultUnwrapper().convert(transactionResult);
return convertPipelineAndTxResults
? new LettuceTransactionResultConverter(txResults, exceptionConverter).convert(results)
: results;
} catch (Exception ex) {
throw convertLettuceAccessException(ex);
} finally {
txResults.clear();
}
}
@Override
public void multi() {
if (isQueueing()) {
return;
}
isMulti = true;
try {
if (isPipelined()) {
getAsyncDedicatedRedisCommands().multi();
return;
}
getDedicatedRedisCommands().multi();
} catch (Exception ex) {
throw convertLettuceAccessException(ex);
}
}
@Override
public void select(int dbIndex) {
if (asyncSharedConn != null) {
throw new InvalidDataAccessApiUsageException("Selecting a new database not supported due to shared connection;"
+ " Use separate ConnectionFactorys to work with multiple databases");
}
this.dbIndex = dbIndex;
invokeStatus().just(RedisClusterAsyncCommands::dispatch, CommandType.SELECT,
new StatusOutput<>(ByteArrayCodec.INSTANCE), new CommandArgs<>(ByteArrayCodec.INSTANCE).add(dbIndex));
}
@Override
public void unwatch() {
try {
if (isPipelined()) {
pipeline(newLettuceStatusResult(getAsyncDedicatedRedisCommands().unwatch()));
return;
}
if (isQueueing()) {
transaction(newLettuceStatusResult(getAsyncDedicatedRedisCommands().unwatch()));
return;
}
getDedicatedRedisCommands().unwatch();
} catch (Exception ex) {
throw convertLettuceAccessException(ex);
}
}
@Override
public void watch(byte[]... keys) {
if (isQueueing()) {
throw new InvalidDataAccessApiUsageException("WATCH is not supported when a transaction is active");
}
try {
if (isPipelined()) {
pipeline(newLettuceStatusResult(getAsyncDedicatedRedisCommands().watch(keys)));
return;
}
if (isQueueing()) {
transaction(new LettuceStatusResult(getAsyncDedicatedRedisCommands().watch(keys)));
return;
}
getDedicatedRedisCommands().watch(keys);
} catch (Exception ex) {
throw convertLettuceAccessException(ex);
}
}
//
// Pub/Sub functionality
//
@Override
public Long publish(byte[] channel, byte[] message) {
return invoke().just(RedisClusterAsyncCommands::publish, channel, message);
}
@Override
public Subscription getSubscription() {
return subscription;
}
@Override
public boolean isSubscribed() {
return (subscription != null && subscription.isAlive());
}
@Override
public void pSubscribe(MessageListener listener, byte[]... patterns) {
checkSubscription();
if (isQueueing() || isPipelined()) {
throw new InvalidDataAccessApiUsageException("Transaction/Pipelining is not supported for Pub/Sub subscriptions");
}
try {
subscription = initSubscription(listener);
subscription.pSubscribe(patterns);
} catch (Exception ex) {
throw convertLettuceAccessException(ex);
}
}
@Override
public void subscribe(MessageListener listener, byte[]... channels) {
checkSubscription();
if (isQueueing() || isPipelined()) {
throw new InvalidDataAccessApiUsageException("Transaction/Pipelining is not supported for Pub/Sub subscriptions");
}
try {
subscription = initSubscription(listener);
subscription.subscribe(channels);
} catch (Exception ex) {
throw convertLettuceAccessException(ex);
}
}
@SuppressWarnings("unchecked")
<T> T failsafeReadScanValues(List<?> source, @SuppressWarnings("rawtypes") Converter converter) {
try {
return (T) (converter != null ? converter.convert(source) : source);
} catch (IndexOutOfBoundsException e) {
// ignore this one
}
return null;
}
/**
* Specifies if pipelined and transaction results should be converted to the expected data type. If false, results of
* {@link #closePipeline()} and {@link #exec()} will be of the type returned by the Lettuce driver
*
* @param convertPipelineAndTxResults Whether or not to convert pipeline and tx results
*/
public void setConvertPipelineAndTxResults(boolean convertPipelineAndTxResults) {
this.convertPipelineAndTxResults = convertPipelineAndTxResults;
}
/**
* Configures the flushing policy when using pipelining.
*
* @param pipeliningFlushPolicy the flushing policy to control when commands get written to the Redis connection.
* @see PipeliningFlushPolicy#flushEachCommand()
* @see #openPipeline()
* @see StatefulRedisConnection#flushCommands()
* @since 2.3
*/
public void setPipeliningFlushPolicy(PipeliningFlushPolicy pipeliningFlushPolicy) {
Assert.notNull(pipeliningFlushPolicy, "PipeliningFlushingPolicy must not be null");
this.pipeliningFlushPolicy = pipeliningFlushPolicy;
}
/**
* {@link #close()} the current connection and open a new pub/sub connection to the Redis server.
*
* @return never {@literal null}.
*/
@SuppressWarnings("unchecked")
protected StatefulRedisPubSubConnection<byte[], byte[]> switchToPubSub() {
checkSubscription();
reset();
return connectionProvider.getConnection(StatefulRedisPubSubConnection.class);
}
/**
* Customization hook to create a {@link LettuceSubscription}.
*
* @param listener the {@link MessageListener} to notify.
* @param connection Pub/Sub connection.
* @param connectionProvider the {@link LettuceConnectionProvider} for connection release.
* @return a {@link LettuceSubscription}.
* @since 2.2
*/
protected LettuceSubscription doCreateSubscription(MessageListener listener,
StatefulRedisPubSubConnection<byte[], byte[]> connection, LettuceConnectionProvider connectionProvider) {
return new LettuceSubscription(listener, connection, connectionProvider);
}
void pipeline(LettuceResult<?, ?> result) {
if (flushState != null) {
flushState.onCommand(getOrCreateDedicatedConnection());
}
if (isQueueing()) {
transaction(result);
} else {
ppline.add(result);
}
}
/**
* Obtain a {@link LettuceInvoker} to call Lettuce methods using the default {@link #getAsyncConnection() connection}.
*
* @return the {@link LettuceInvoker}.
* @since 2.5
*/
LettuceInvoker invoke() {
return invoke(getAsyncConnection());
}
/**
* Obtain a {@link LettuceInvoker} to call Lettuce methods using the given {@link RedisClusterAsyncCommands
* connection}.
*
* @param connection the connection to use.
* @return the {@link LettuceInvoker}.
* @since 2.5
*/
LettuceInvoker invoke(RedisClusterAsyncCommands<byte[], byte[]> connection) {
return doInvoke(connection, false);
}
/**
* Obtain a {@link LettuceInvoker} to call Lettuce methods returning a status response using the default
* {@link #getAsyncConnection() connection}. Status responses are not included in transactional and pipeline results.
*
* @return the {@link LettuceInvoker}.
* @since 2.5
*/
LettuceInvoker invokeStatus() {
return doInvoke(getAsyncConnection(), true);
}
private LettuceInvoker doInvoke(RedisClusterAsyncCommands<byte[], byte[]> connection, boolean statusCommand) {
if (isPipelined()) {
return new LettuceInvoker(connection, (future, converter, nullDefault) -> {
try {
if (statusCommand) {
pipeline(newLettuceStatusResult(future.get()));
} else {
pipeline(newLettuceResult(future.get(), converter, nullDefault));
}
} catch (Exception ex) {
throw convertLettuceAccessException(ex);
}
return null;
});
}
if (isQueueing()) {
return new LettuceInvoker(connection, (future, converter, nullDefault) -> {
try {
if (statusCommand) {
transaction(newLettuceStatusResult(future.get()));
} else {
transaction(newLettuceResult(future.get(), converter, nullDefault));
}
} catch (Exception ex) {
throw convertLettuceAccessException(ex);
}
return null;
});
}
return new LettuceInvoker(connection, (future, converter, nullDefault) -> {
try {
Object result = await(future.get());
if (result == null) {
return nullDefault.get();
}
return converter.convert(result);
} catch (Exception ex) {
throw convertLettuceAccessException(ex);
}
});
}
void transaction(FutureResult<?> result) {
txResults.add(result);
}
RedisClusterAsyncCommands<byte[], byte[]> getAsyncConnection() {
if (isQueueing() || isPipelined()) {
return getAsyncDedicatedConnection();
}
if (asyncSharedConn != null) {
if (asyncSharedConn instanceof StatefulRedisConnection) {
return ((StatefulRedisConnection<byte[], byte[]>) asyncSharedConn).async();
}
if (asyncSharedConn instanceof StatefulRedisClusterConnection) {
return ((StatefulRedisClusterConnection<byte[], byte[]>) asyncSharedConn).async();
}
}
return getAsyncDedicatedConnection();
}
protected RedisClusterCommands<byte[], byte[]> getConnection() {
if (isQueueing()) {
return getDedicatedConnection();
}
if (asyncSharedConn != null) {
if (asyncSharedConn instanceof StatefulRedisConnection) {
return ((StatefulRedisConnection<byte[], byte[]>) asyncSharedConn).sync();
}
if (asyncSharedConn instanceof StatefulRedisClusterConnection) {
return ((StatefulRedisClusterConnection<byte[], byte[]>) asyncSharedConn).sync();
}
}
return getDedicatedConnection();
}
RedisClusterCommands<byte[], byte[]> getDedicatedConnection() {
StatefulConnection<byte[], byte[]> connection = getOrCreateDedicatedConnection();
if (connection instanceof StatefulRedisConnection) {
return ((StatefulRedisConnection<byte[], byte[]>) connection).sync();
}
if (connection instanceof StatefulRedisClusterConnection) {
return ((StatefulRedisClusterConnection<byte[], byte[]>) connection).sync();
}
throw new IllegalStateException(
String.format("%s is not a supported connection type", connection.getClass().getName()));
}
protected RedisClusterAsyncCommands<byte[], byte[]> getAsyncDedicatedConnection() {
if (isClosed()) {
throw new RedisSystemException("Connection is closed", null);
}
StatefulConnection<byte[], byte[]> connection = getOrCreateDedicatedConnection();
if (connection instanceof StatefulRedisConnection) {
return ((StatefulRedisConnection<byte[], byte[]>) connection).async();
}
if (asyncDedicatedConn instanceof StatefulRedisClusterConnection) {
return ((StatefulRedisClusterConnection<byte[], byte[]>) connection).async();
}
throw new IllegalStateException(
String.format("%s is not a supported connection type", connection.getClass().getName()));
}
@SuppressWarnings("unchecked")
protected StatefulConnection<byte[], byte[]> doGetAsyncDedicatedConnection() {
StatefulConnection connection = connectionProvider.getConnection(StatefulConnection.class);
if (customizedDatabaseIndex()) {
potentiallySelectDatabase(dbIndex);
}
return connection;
}
@Override
protected boolean isActive(RedisNode node) {
StatefulRedisSentinelConnection<String, String> connection = null;
try {
connection = getConnection(node);
return connection.sync().ping().equalsIgnoreCase("pong");
} catch (Exception e) {
return false;
} finally {
if (connection != null) {
connectionProvider.release(connection);
}
}
}
@Override
protected RedisSentinelConnection getSentinelConnection(RedisNode sentinel) {
StatefulRedisSentinelConnection<String, String> connection = getConnection(sentinel);
return new LettuceSentinelConnection(connection);
}
LettuceConnectionProvider getConnectionProvider() {
return connectionProvider;
}
@SuppressWarnings("unchecked")
private StatefulRedisSentinelConnection<String, String> getConnection(RedisNode sentinel) {
return ((TargetAware) connectionProvider).getConnection(StatefulRedisSentinelConnection.class,
getRedisURI(sentinel));
}
@Nullable
private <T> T await(RedisFuture<T> cmd) {
if (isMulti) {
return null;
}
try {
return LettuceFutures.awaitOrCancel(cmd, timeout, TimeUnit.MILLISECONDS);
} catch (RuntimeException e) {
throw convertLettuceAccessException(e);
}
}
private StatefulConnection<byte[], byte[]> getOrCreateDedicatedConnection() {
if (asyncDedicatedConn == null) {
asyncDedicatedConn = doGetAsyncDedicatedConnection();
}
return asyncDedicatedConn;
}
@SuppressWarnings("unchecked")
private RedisCommands<byte[], byte[]> getDedicatedRedisCommands() {
return (RedisCommands) getDedicatedConnection();
}
@SuppressWarnings("unchecked")
private RedisAsyncCommands<byte[], byte[]> getAsyncDedicatedRedisCommands() {
return (RedisAsyncCommands) getAsyncDedicatedConnection();
}
private void checkSubscription() {
if (isSubscribed()) {
throw new RedisSubscribedConnectionException(
"Connection already subscribed; use the connection Subscription to cancel or add new channels");
}
}
private LettuceSubscription initSubscription(MessageListener listener) {
return doCreateSubscription(listener, switchToPubSub(), connectionProvider);
}
private RedisURI getRedisURI(RedisNode node) {
return RedisURI.Builder.redis(node.getHost(), node.getPort()).build();
}
private boolean customizedDatabaseIndex() {
return defaultDbIndex != dbIndex;
}
private void potentiallySelectDatabase(int dbIndex) {
if (asyncDedicatedConn instanceof StatefulRedisConnection) {
((StatefulRedisConnection<byte[], byte[]>) asyncDedicatedConn).sync().select(dbIndex);
}
}
io.lettuce.core.ScanCursor getScanCursor(long cursorId) {
return io.lettuce.core.ScanCursor.of(Long.toString(cursorId));
}
private void validateCommandIfRunningInTransactionMode(ProtocolKeyword cmd, byte[]... args) {
if (this.isQueueing()) {
validateCommand(cmd, args);
}
}
private void validateCommand(ProtocolKeyword cmd, @Nullable byte[]... args) {
RedisCommand redisCommand = RedisCommand.failsafeCommandLookup(cmd.name());
if (!RedisCommand.UNKNOWN.equals(redisCommand) && redisCommand.requiresArguments()) {
try {
redisCommand.validateArgumentCount(args != null ? args.length : 0);
} catch (IllegalArgumentException e) {
throw new InvalidDataAccessApiUsageException(String.format("Validation failed for %s command", cmd), e);
}
}
}
private static ProtocolKeyword getCommandType(String name) {
try {
return CommandType.valueOf(name);
} catch (IllegalArgumentException e) {
return new CustomCommandType(name);
}
}
/**
* {@link TypeHints} provide {@link CommandOutput} information for a given {@link CommandType}.
*
* @since 1.2.1
*/
static class TypeHints {
@SuppressWarnings("rawtypes") //
private static final Map<ProtocolKeyword, Class<? extends CommandOutput>> COMMAND_OUTPUT_TYPE_MAPPING = new HashMap<>();
@SuppressWarnings("rawtypes") //
private static final Map<Class<?>, Constructor<CommandOutput>> CONSTRUCTORS = new ConcurrentHashMap<>();
{
// INTEGER
COMMAND_OUTPUT_TYPE_MAPPING.put(BITCOUNT, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(BITOP, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(BITPOS, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(DBSIZE, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(DECR, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(DECRBY, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(DEL, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(COPY, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(GETBIT, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(HDEL, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(HINCRBY, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(HLEN, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(INCR, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(INCRBY, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(LINSERT, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(LLEN, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(LPUSH, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(LPOS, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(LPUSHX, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(LREM, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(PTTL, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(PUBLISH, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(RPUSH, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(RPUSHX, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SADD, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SCARD, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SDIFFSTORE, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SETBIT, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SETRANGE, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SINTERSTORE, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SREM, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SUNIONSTORE, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(STRLEN, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(TTL, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(ZADD, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(ZCOUNT, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(ZINTERSTORE, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(ZRANK, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(ZREM, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(ZREMRANGEBYRANK, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(ZREMRANGEBYSCORE, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(ZREVRANK, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(ZUNIONSTORE, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(PFCOUNT, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(PFMERGE, IntegerOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(PFADD, IntegerOutput.class);
// DOUBLE
COMMAND_OUTPUT_TYPE_MAPPING.put(HINCRBYFLOAT, DoubleOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(INCRBYFLOAT, DoubleOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(MGET, ValueListOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(ZINCRBY, DoubleOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(ZSCORE, DoubleOutput.class);
// DOUBLE LIST
COMMAND_OUTPUT_TYPE_MAPPING.put(ZMSCORE, DoubleListOutput.class);
// MAP
COMMAND_OUTPUT_TYPE_MAPPING.put(HGETALL, MapOutput.class);
// KEY LIST
COMMAND_OUTPUT_TYPE_MAPPING.put(HKEYS, KeyListOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(KEYS, KeyListOutput.class);
// KEY VALUE
COMMAND_OUTPUT_TYPE_MAPPING.put(BRPOP, KeyValueOutput.class);
// SINGLE VALUE
COMMAND_OUTPUT_TYPE_MAPPING.put(BRPOPLPUSH, ValueOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(ECHO, ValueOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(GET, ValueOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(GETRANGE, ValueOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(GETSET, ValueOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(HGET, ValueOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(LINDEX, ValueOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(LPOP, ValueOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(RANDOMKEY, ValueOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(RENAME, ValueOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(RPOP, ValueOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(RPOPLPUSH, ValueOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SPOP, ValueOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SRANDMEMBER, ValueOutput.class);
// STATUS VALUE
COMMAND_OUTPUT_TYPE_MAPPING.put(BGREWRITEAOF, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(BGSAVE, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(CLIENT, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(DEBUG, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(DISCARD, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(FLUSHALL, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(FLUSHDB, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(HMSET, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(INFO, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(LSET, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(LTRIM, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(MIGRATE, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(MSET, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(QUIT, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(RESTORE, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SAVE, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SELECT, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SET, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SETEX, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SHUTDOWN, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SLAVEOF, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SYNC, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(TYPE, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(WATCH, StatusOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(UNWATCH, StatusOutput.class);
// VALUE LIST
COMMAND_OUTPUT_TYPE_MAPPING.put(HMGET, ValueListOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(MGET, ValueListOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(HVALS, ValueListOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(LRANGE, ValueListOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SORT, ValueListOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(ZRANGE, ValueListOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(ZRANGEBYSCORE, ValueListOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(ZREVRANGE, ValueListOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(ZREVRANGEBYSCORE, ValueListOutput.class);
// BOOLEAN
COMMAND_OUTPUT_TYPE_MAPPING.put(EXISTS, BooleanOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(EXPIRE, BooleanOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(EXPIREAT, BooleanOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(HEXISTS, BooleanOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(HSET, BooleanOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(HSETNX, BooleanOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(MOVE, BooleanOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(COPY, BooleanOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(MSETNX, BooleanOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(PERSIST, BooleanOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(PEXPIRE, BooleanOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(PEXPIREAT, BooleanOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(RENAMENX, BooleanOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SETNX, BooleanOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SISMEMBER, BooleanOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SMOVE, BooleanOutput.class);
// MULTI
COMMAND_OUTPUT_TYPE_MAPPING.put(EXEC, MultiOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(MULTI, MultiOutput.class);
// DATE
COMMAND_OUTPUT_TYPE_MAPPING.put(LASTSAVE, DateOutput.class);
// VALUE SET
COMMAND_OUTPUT_TYPE_MAPPING.put(SDIFF, ValueSetOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SINTER, ValueSetOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SMEMBERS, ValueSetOutput.class);
COMMAND_OUTPUT_TYPE_MAPPING.put(SUNION, ValueSetOutput.class);
}
/**
* Returns the {@link CommandOutput} mapped for given {@link CommandType} or {@link ByteArrayOutput} as default.
*
* @param type
* @return {@link ByteArrayOutput} as default when no matching {@link CommandOutput} available.
*/
@SuppressWarnings("rawtypes")
public CommandOutput getTypeHint(ProtocolKeyword type) {
return getTypeHint(type, new ByteArrayOutput<>(CODEC));
}
/**
* Returns the {@link CommandOutput} mapped for given {@link CommandType} given {@link CommandOutput} as default.
*
* @param type
* @return
*/
@SuppressWarnings("rawtypes")
public CommandOutput getTypeHint(ProtocolKeyword type, CommandOutput defaultType) {
if (type == null || !COMMAND_OUTPUT_TYPE_MAPPING.containsKey(type)) {
return defaultType;
}
CommandOutput<?, ?, ?> outputType = instanciateCommandOutput(COMMAND_OUTPUT_TYPE_MAPPING.get(type));
return outputType != null ? outputType : defaultType;
}
@SuppressWarnings({ "rawtypes", "unchecked" })
private CommandOutput<?, ?, ?> instanciateCommandOutput(Class<? extends CommandOutput> type) {
Assert.notNull(type, "Cannot create instance for 'null' type.");
Constructor<CommandOutput> constructor = CONSTRUCTORS.get(type);
if (constructor == null) {
constructor = (Constructor<CommandOutput>) ClassUtils.getConstructorIfAvailable(type, RedisCodec.class);
CONSTRUCTORS.put(type, constructor);
}
return BeanUtils.instantiateClass(constructor, CODEC);
}
}
/**
* Strategy interface to control pipelining flush behavior. Lettuce writes (flushes) each command individually to the
* Redis connection. Flushing behavior can be customized to optimize for performance. Flushing can be either stateless
* or stateful. An example for stateful flushing is size-based (buffer) flushing to flush after a configured number of
* commands.
*
* @see StatefulRedisConnection#setAutoFlushCommands(boolean)
* @see StatefulRedisConnection#flushCommands()
* @author Mark Paluch
* @since 2.3
*/
public interface PipeliningFlushPolicy {
/**
* Return a policy to flush after each command (default behavior).
*
* @return a policy to flush after each command.
*/
static PipeliningFlushPolicy flushEachCommand() {
return FlushEachCommand.INSTANCE;
}
/**
* Return a policy to flush only if {@link #closePipeline()} is called.
*
* @return a policy to flush after each command.
*/
static PipeliningFlushPolicy flushOnClose() {
return FlushOnClose.INSTANCE;
}
/**
* Return a policy to buffer commands and to flush once reaching the configured {@code bufferSize}. The buffer is
* recurring so a buffer size of e.g. {@code 2} will flush after 2, 4, 6, … commands.
*
* @param bufferSize the number of commands to buffer before flushing. Must be greater than zero.
* @return a policy to flush buffered commands to the Redis connection once the configured number of commands was
* issued.
*/
static PipeliningFlushPolicy buffered(int bufferSize) {
Assert.isTrue(bufferSize > 0, "Buffer size must be greater than 0");
return () -> new BufferedFlushing(bufferSize);
}
PipeliningFlushState newPipeline();
}
/**
* State object associated with flushing of the currently ongoing pipeline.
*
* @author Mark Paluch
* @since 2.3
*/
public interface PipeliningFlushState {
/**
* Callback if the pipeline gets opened.
*
* @param connection
* @see #openPipeline()
*/
void onOpen(StatefulConnection<?, ?> connection);
/**
* Callback for each issued Redis command.
*
* @param connection
* @see #pipeline(LettuceResult)
*/
void onCommand(StatefulConnection<?, ?> connection);
/**
* Callback if the pipeline gets closed.
*
* @param connection
* @see #closePipeline()
*/
void onClose(StatefulConnection<?, ?> connection);
}
/**
* Implementation to flush on each command.
*
* @author Mark Paluch
* @since 2.3
*/
private enum FlushEachCommand implements PipeliningFlushPolicy, PipeliningFlushState {
INSTANCE;
@Override
public PipeliningFlushState newPipeline() {
return INSTANCE;
}
@Override
public void onOpen(StatefulConnection<?, ?> connection) {}
@Override
public void onCommand(StatefulConnection<?, ?> connection) {}
@Override
public void onClose(StatefulConnection<?, ?> connection) {}
}
/**
* Implementation to flush on closing the pipeline.
*
* @author Mark Paluch
* @since 2.3
*/
private enum FlushOnClose implements PipeliningFlushPolicy, PipeliningFlushState {
INSTANCE;
@Override
public PipeliningFlushState newPipeline() {
return INSTANCE;
}
@Override
public void onOpen(StatefulConnection<?, ?> connection) {
connection.setAutoFlushCommands(false);
}
@Override
public void onCommand(StatefulConnection<?, ?> connection) {
}
@Override
public void onClose(StatefulConnection<?, ?> connection) {
connection.flushCommands();
connection.setAutoFlushCommands(true);
}
}
/**
* Pipeline state for buffered flushing.
*
* @author Mark Paluch
* @since 2.3
*/
private static class BufferedFlushing implements PipeliningFlushState {
private final AtomicLong commands = new AtomicLong();
private final int flushAfter;
public BufferedFlushing(int flushAfter) {
this.flushAfter = flushAfter;
}
@Override
public void onOpen(StatefulConnection<?, ?> connection) {
connection.setAutoFlushCommands(false);
}
@Override
public void onCommand(StatefulConnection<?, ?> connection) {
if (commands.incrementAndGet() % flushAfter == 0) {
connection.flushCommands();
}
}
@Override
public void onClose(StatefulConnection<?, ?> connection) {
connection.flushCommands();
connection.setAutoFlushCommands(true);
}
}
/**
* @since 2.3.8
*/
static class CustomCommandType implements ProtocolKeyword {
private final String name;
CustomCommandType(String name) {
this.name = name;
}
@Override
public byte[] getBytes() {
return name.getBytes(StandardCharsets.US_ASCII);
}
@Override
public String name() {
return name;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof CustomCommandType)) {
return false;
}
CustomCommandType that = (CustomCommandType) o;
return ObjectUtils.nullSafeEquals(name, that.name);
}
@Override
public int hashCode() {
return ObjectUtils.nullSafeHashCode(name);
}
@Override
public String toString() {
return name;
}
}
}
相关信息
相关文章
spring-data-redis ClusterConnectionProvider 源码
spring-data-redis DefaultLettuceClientConfiguration 源码
spring-data-redis DefaultLettucePoolingClientConfiguration 源码
spring-data-redis LettuceByteBufferPubSubListenerWrapper 源码
spring-data-redis LettuceClientConfiguration 源码
spring-data-redis LettuceClusterConnection 源码
spring-data-redis LettuceClusterGeoCommands 源码
spring-data-redis LettuceClusterHashCommands 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦