spring-data-redis LettuceReactiveStreamCommands 源码
spring-data-redis LettuceReactiveStreamCommands 代码
文件路径:/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveStreamCommands.java
/*
* Copyright 2018-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.redis.connection.lettuce;
import io.lettuce.core.XAddArgs;
import io.lettuce.core.XClaimArgs;
import io.lettuce.core.XGroupCreateArgs;
import io.lettuce.core.XReadArgs;
import io.lettuce.core.XReadArgs.StreamOffset;
import io.lettuce.core.cluster.api.reactive.RedisClusterReactiveCommands;
import io.lettuce.core.models.stream.PendingMessage;
import reactor.core.publisher.Flux;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.KeyCommand;
import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse;
import org.springframework.data.redis.connection.ReactiveStreamCommands;
import org.springframework.data.redis.connection.ReactiveStreamCommands.GroupCommand.GroupCommandAction;
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.PendingMessages;
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoConsumer;
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoGroup;
import org.springframework.data.redis.connection.stream.StreamInfo.XInfoStream;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.util.ByteUtils;
import org.springframework.util.Assert;
/**
* {@link ReactiveStreamCommands} implementation for {@literal Lettuce}.
*
* @author Mark Paluch
* @author Tugdual Grall
* @author Dengliming
* @author Mark John Moreno
* @since 2.2
*/
class LettuceReactiveStreamCommands implements ReactiveStreamCommands {
private final LettuceReactiveRedisConnection connection;
/**
* Create new {@link LettuceReactiveStreamCommands}.
*
* @param connection must not be {@literal null}.
*/
LettuceReactiveStreamCommands(LettuceReactiveRedisConnection connection) {
Assert.notNull(connection, "Connection must not be null");
this.connection = connection;
}
@Override
public Flux<NumericResponse<AcknowledgeCommand, Long>> xAck(Publisher<AcknowledgeCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
Assert.notNull(command.getGroup(), "Group must not be null");
Assert.notNull(command.getRecordIds(), "recordIds must not be null");
return cmd
.xack(command.getKey(), ByteUtils.getByteBuffer(command.getGroup()), entryIdsToString(command.getRecordIds()))
.map(value -> new NumericResponse<>(command, value));
}));
}
@Override
public Flux<CommandResponse<AddStreamRecord, RecordId>> xAdd(Publisher<AddStreamRecord> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
Assert.notNull(command.getBody(), "Body must not be null");
XAddArgs args = new XAddArgs();
if (!command.getRecord().getId().shouldBeAutoGenerated()) {
args.id(command.getRecord().getId().getValue());
}
if (command.hasMaxlen()) {
args.maxlen(command.getMaxlen());
}
if (command.hasMinId()) {
args.minId(command.getMinId().getValue());
}
args.nomkstream(command.isNoMkStream());
args.approximateTrimming(command.isApproximateTrimming());
return cmd.xadd(command.getKey(), args, command.getBody())
.map(value -> new CommandResponse<>(command, RecordId.of(value)));
}));
}
@Override
public Flux<CommandResponse<XClaimCommand, Flux<RecordId>>> xClaimJustId(Publisher<XClaimCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).map(command -> {
String[] ids = command.getOptions().getIdsAsStringArray();
io.lettuce.core.Consumer<ByteBuffer> from = io.lettuce.core.Consumer
.from(ByteUtils.getByteBuffer(command.getGroupName()), ByteUtils.getByteBuffer(command.getNewOwner()));
XClaimArgs args = StreamConverters.toXClaimArgs(command.getOptions()).justid();
Flux<RecordId> result = cmd.xclaim(command.getKey(), from, args, ids).map(it -> RecordId.of(it.getId()));
return new CommandResponse<>(command, result);
}));
}
@Override
public Flux<CommandResponse<XClaimCommand, Flux<ByteBufferRecord>>> xClaim(Publisher<XClaimCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).map(command -> {
String[] ids = command.getOptions().getIdsAsStringArray();
io.lettuce.core.Consumer<ByteBuffer> from = io.lettuce.core.Consumer
.from(ByteUtils.getByteBuffer(command.getGroupName()), ByteUtils.getByteBuffer(command.getNewOwner()));
XClaimArgs args = StreamConverters.toXClaimArgs(command.getOptions());
Flux<ByteBufferRecord> result = cmd.xclaim(command.getKey(), from, args, ids)
.map(it -> StreamRecords.newRecord().in(it.getStream()).withId(it.getId()).ofBuffer(it.getBody()));
return new CommandResponse<>(command, result);
}));
}
@Override
public Flux<CommandResponse<DeleteCommand, Long>> xDel(Publisher<DeleteCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
Assert.notNull(command.getRecordIds(), "recordIds must not be null");
return cmd.xdel(command.getKey(), entryIdsToString(command.getRecordIds()))
.map(value -> new NumericResponse<>(command, value));
}));
}
@Override
@SuppressWarnings({ "unchecked", "rawtypes" })
public Flux<CommandResponse<GroupCommand, String>> xGroup(Publisher<GroupCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
Assert.notNull(command.getGroupName(), "GroupName must not be null");
if (command.getAction().equals(GroupCommandAction.CREATE)) {
Assert.notNull(command.getReadOffset(), "ReadOffset must not be null");
StreamOffset offset = StreamOffset.from(command.getKey(), command.getReadOffset().getOffset());
return cmd.xgroupCreate(offset,
ByteUtils.getByteBuffer(command.getGroupName()),
XGroupCreateArgs.Builder.mkstream(command.isMkStream()))
.map(it ->
new CommandResponse<>(command, it)
);
}
if (command.getAction().equals(GroupCommandAction.DELETE_CONSUMER)) {
return cmd
.xgroupDelconsumer(command.getKey(),
io.lettuce.core.Consumer.from(ByteUtils.getByteBuffer(command.getGroupName()),
ByteUtils.getByteBuffer(command.getConsumerName())))
.map(it -> new CommandResponse<>(command, "OK"));
}
if (command.getAction().equals(GroupCommandAction.DESTROY)) {
return cmd.xgroupDestroy(command.getKey(), ByteUtils.getByteBuffer(command.getGroupName()))
.map(it -> new CommandResponse<>(command, Boolean.TRUE.equals(it) ? "OK" : "Error"));
}
throw new IllegalArgumentException("Unknown group command " + command.getAction());
}));
}
@Override
public Flux<NumericResponse<KeyCommand, Long>> xLen(Publisher<KeyCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
return cmd.xlen(command.getKey()).map(value -> new NumericResponse<>(command, value));
}));
}
@Override
public Flux<CommandResponse<PendingRecordsCommand, PendingMessagesSummary>> xPendingSummary(
Publisher<PendingRecordsCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
return cmd.xpending(command.getKey(), ByteUtils.getByteBuffer(command.getGroupName())).map(it -> {
return StreamConverters.toPendingMessagesInfo(command.getGroupName(), it);
}).map(value -> new CommandResponse<>(command, value));
}));
}
@Override
public Flux<CommandResponse<PendingRecordsCommand, PendingMessages>> xPending(
Publisher<PendingRecordsCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
ByteBuffer groupName = ByteUtils.getByteBuffer(command.getGroupName());
io.lettuce.core.Range<String> range = RangeConverter.toRangeWithDefault(command.getRange(), "-", "+");
io.lettuce.core.Limit limit = command.isLimited() ? io.lettuce.core.Limit.from(command.getCount())
: io.lettuce.core.Limit.unlimited();
Flux<PendingMessage> publisher = command.hasConsumer() ? cmd.xpending(command.getKey(),
io.lettuce.core.Consumer.from(groupName, ByteUtils.getByteBuffer(command.getConsumerName())), range, limit)
: cmd.xpending(command.getKey(), groupName, range, limit);
return publisher.collectList().map(it -> {
return StreamConverters.toPendingMessages(command.getGroupName(), command.getRange(), it);
}).map(value -> new CommandResponse<>(command, value));
}));
}
@Override
public Flux<CommandResponse<RangeCommand, Flux<ByteBufferRecord>>> xRange(Publisher<RangeCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).map(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
Assert.notNull(command.getRange(), "Range must not be null");
Assert.notNull(command.getLimit(), "Limit must not be null");
io.lettuce.core.Range<String> lettuceRange = RangeConverter.toRange(command.getRange(), Function.identity());
io.lettuce.core.Limit lettuceLimit = LettuceConverters.toLimit(command.getLimit());
return new CommandResponse<>(command, cmd.xrange(command.getKey(), lettuceRange, lettuceLimit)
.map(it -> StreamRecords.newRecord().in(it.getStream()).withId(it.getId()).ofBuffer(it.getBody())));
}));
}
@Override
public Flux<CommandResponse<ReadCommand, Flux<ByteBufferRecord>>> read(Publisher<ReadCommand> commands) {
return Flux.from(commands).map(command -> {
Assert.notNull(command.getStreamOffsets(), "StreamOffsets must not be null");
Assert.notNull(command.getReadOptions(), "ReadOptions must not be null");
StreamReadOptions readOptions = command.getReadOptions();
if (readOptions.isBlocking()) {
return new CommandResponse<>(command, connection.executeDedicated(cmd -> doRead(command, readOptions, cmd)));
}
return new CommandResponse<>(command, connection.execute(cmd -> doRead(command, readOptions, cmd)));
});
}
private static Flux<ByteBufferRecord> doRead(ReadCommand command, StreamReadOptions readOptions,
RedisClusterReactiveCommands<ByteBuffer, ByteBuffer> cmd) {
StreamOffset<ByteBuffer>[] streamOffsets = toStreamOffsets(command.getStreamOffsets());
XReadArgs args = StreamConverters.toReadArgs(readOptions);
if (command.getConsumer() == null) {
return cmd.xread(args, streamOffsets)
.map(it -> StreamRecords.newRecord().in(it.getStream()).withId(it.getId()).ofBuffer(it.getBody()));
}
io.lettuce.core.Consumer<ByteBuffer> lettuceConsumer = toConsumer(command.getConsumer());
return cmd.xreadgroup(lettuceConsumer, args, streamOffsets)
.map(it -> StreamRecords.newRecord().in(it.getStream()).withId(it.getId()).ofBuffer(it.getBody()));
}
@Override
public Flux<CommandResponse<XInfoCommand, XInfoStream>> xInfo(Publisher<XInfoCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
return cmd.xinfoStream(command.getKey()).collectList().map(XInfoStream::fromList)
.map(it -> new CommandResponse<>(command, it));
}));
}
@Override
public Flux<CommandResponse<XInfoCommand, Flux<XInfoGroup>>> xInfoGroups(Publisher<XInfoCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).map(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
return new CommandResponse<>(command,
cmd.xinfoGroups(command.getKey()).map(it -> XInfoGroup.fromList((List<Object>) it)));
}));
}
@Override
public Flux<CommandResponse<XInfoCommand, Flux<XInfoConsumer>>> xInfoConsumers(Publisher<XInfoCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).map(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
ByteBuffer groupName = ByteUtils.getByteBuffer(command.getGroupName());
return new CommandResponse<>(command, cmd.xinfoConsumers(command.getKey(), groupName)
.map(it -> new XInfoConsumer(command.getGroupName(), (List<Object>) it)));
}));
}
@Override
public Flux<CommandResponse<RangeCommand, Flux<ByteBufferRecord>>> xRevRange(Publisher<RangeCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).map(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
Assert.notNull(command.getRange(), "Range must not be null");
Assert.notNull(command.getLimit(), "Limit must not be null");
io.lettuce.core.Range<String> lettuceRange = RangeConverter.toRange(command.getRange(), Function.identity());
io.lettuce.core.Limit lettuceLimit = LettuceConverters.toLimit(command.getLimit());
return new CommandResponse<>(command, cmd.xrevrange(command.getKey(), lettuceRange, lettuceLimit)
.map(it -> StreamRecords.newRecord().in(it.getStream()).withId(it.getId()).ofBuffer(it.getBody())));
}));
}
@Override
public Flux<NumericResponse<KeyCommand, Long>> xTrim(Publisher<TrimCommand> commands) {
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
Assert.notNull(command.getKey(), "Key must not be null");
Assert.notNull(command.getCount(), "Count must not be null");
return cmd.xtrim(command.getKey(), command.isApproximateTrimming(), command.getCount()).map(value -> new NumericResponse<>(command, value));
}));
}
@SuppressWarnings("unchecked")
private static <T> StreamOffset<T>[] toStreamOffsets(
Collection<org.springframework.data.redis.connection.stream.StreamOffset<T>> streams) {
return streams.stream().map(it -> StreamOffset.from(it.getKey(), it.getOffset().getOffset()))
.toArray(StreamOffset[]::new);
}
private static io.lettuce.core.Consumer<ByteBuffer> toConsumer(Consumer consumer) {
return io.lettuce.core.Consumer.from(ByteUtils.getByteBuffer(consumer.getGroup()),
ByteUtils.getByteBuffer(consumer.getName()));
}
private static String[] entryIdsToString(List<RecordId> recordIds) {
if (recordIds.size() == 1) {
return new String[] { recordIds.get(0).getValue() };
}
return recordIds.stream().map(RecordId::getValue).toArray(String[]::new);
}
}
相关信息
相关文章
spring-data-redis ClusterConnectionProvider 源码
spring-data-redis DefaultLettuceClientConfiguration 源码
spring-data-redis DefaultLettucePoolingClientConfiguration 源码
spring-data-redis LettuceByteBufferPubSubListenerWrapper 源码
spring-data-redis LettuceClientConfiguration 源码
spring-data-redis LettuceClusterConnection 源码
spring-data-redis LettuceClusterGeoCommands 源码
spring-data-redis LettuceClusterHashCommands 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦