spring-data-redis LettuceReactiveClusterSetCommands 源码

  • 2022-08-16
  • 浏览 (448)

spring-data-redis LettuceReactiveClusterSetCommands 代码

文件路径:/src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveClusterSetCommands.java

/*
 * Copyright 2016-2022 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.springframework.data.redis.connection.lettuce;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

import org.reactivestreams.Publisher;
import org.springframework.data.redis.connection.ClusterSlotHashUtil;
import org.springframework.data.redis.connection.ReactiveClusterSetCommands;
import org.springframework.data.redis.connection.ReactiveRedisConnection.BooleanResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse;
import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse;
import org.springframework.util.Assert;

/**
 * @author Christoph Strobl
 * @author Mark Paluch
 * @since 2.0
 */
class LettuceReactiveClusterSetCommands extends LettuceReactiveSetCommands implements ReactiveClusterSetCommands {

	/**
	 * Create new {@link LettuceReactiveClusterSetCommands}.
	 *
	 * @param connection must not be {@literal null}.
	 */
	LettuceReactiveClusterSetCommands(LettuceReactiveRedisConnection connection) {
		super(connection);
	}

	@Override
	public Flux<CommandResponse<SUnionCommand, Flux<ByteBuffer>>> sUnion(Publisher<SUnionCommand> commands) {

		return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {

			Assert.notNull(command.getKeys(), "Keys must not be null");

			if (ClusterSlotHashUtil.isSameSlotForAllKeys(command.getKeys())) {
				return super.sUnion(Mono.just(command));
			}

			Flux<ByteBuffer> result = Flux.merge(command.getKeys().stream().map(cmd::smembers).collect(Collectors.toList()))
					.distinct();

			return Mono.just(new CommandResponse<>(command, result));
		}));
	}

	@Override
	public Flux<NumericResponse<SUnionStoreCommand, Long>> sUnionStore(Publisher<SUnionStoreCommand> commands) {

		return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {

			Assert.notNull(command.getKeys(), "Source keys must not be null");
			Assert.notNull(command.getKey(), "Destination key must not be null");

			List<ByteBuffer> keys = new ArrayList<>(command.getKeys());
			keys.add(command.getKey());

			if (ClusterSlotHashUtil.isSameSlotForAllKeys(keys)) {
				return super.sUnionStore(Mono.just(command));
			}

			return sUnion(Mono.just(SUnionCommand.keys(command.getKeys()))).next().flatMap(values -> {

				Mono<Long> result = values.getOutput().collectList().flatMap(it -> {

					ByteBuffer[] members = it.toArray(new ByteBuffer[0]);
					return cmd.sadd(command.getKey(), members);
				});

				return result.map(value -> new NumericResponse<>(command, value));
			});
		}));
	}

	@Override
	public Flux<CommandResponse<SInterCommand, Flux<ByteBuffer>>> sInter(Publisher<SInterCommand> commands) {

		return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {

			Assert.notNull(command.getKeys(), "Keys must not be null");

			if (ClusterSlotHashUtil.isSameSlotForAllKeys(command.getKeys())) {
				return super.sInter(Mono.just(command));
			}

			Mono<List<ByteBuffer>> sourceSet = cmd.smembers(command.getKeys().get(0)).distinct().collectList();

			List<Mono<List<ByteBuffer>>> intersectingSets = new ArrayList<>();

			for (int i = 1; i < command.getKeys().size(); i++) {
				intersectingSets.add(cmd.smembers(command.getKeys().get(i)).distinct().collectList());
			}

			Flux<List<ByteBuffer>> result = Flux.zip(sourceSet, Flux.merge(intersectingSets).collectList(),
					(source, intersectings) -> {

						for (List<ByteBuffer> intersecting : intersectings) {
							source.retainAll(intersecting);
						}
				return source;
			});

			return Mono.just(new CommandResponse<>(command, result.concatMap(v -> Flux.fromStream(v.stream()))));
		}));
	}

	@Override
	public Flux<NumericResponse<SInterStoreCommand, Long>> sInterStore(Publisher<SInterStoreCommand> commands) {

		return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {

			Assert.notNull(command.getKeys(), "Source keys must not be null");
			Assert.notNull(command.getKey(), "Destination key must not be null");

			List<ByteBuffer> keys = new ArrayList<>(command.getKeys());
			keys.add(command.getKey());

			if (ClusterSlotHashUtil.isSameSlotForAllKeys(keys)) {
				return super.sInterStore(Mono.just(command));
			}

			return sInter(Mono.just(SInterCommand.keys(command.getKeys()))).next().flatMap(values -> {

				Mono<Long> result = values.getOutput().collectList().flatMap(it -> {

					ByteBuffer[] members = it.toArray(new ByteBuffer[0]);
					return cmd.sadd(command.getKey(), members);
				});

				return result.map(value -> new NumericResponse<>(command, value));
			});
		}));
	}

	@Override
	public Flux<CommandResponse<SDiffCommand, Flux<ByteBuffer>>> sDiff(Publisher<SDiffCommand> commands) {

		return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {

			Assert.notNull(command.getKeys(), "Keys must not be null");

			if (ClusterSlotHashUtil.isSameSlotForAllKeys(command.getKeys())) {
				return super.sDiff(Mono.just(command));
			}

			Mono<List<ByteBuffer>> sourceSet = cmd.smembers(command.getKeys().get(0)).distinct().collectList();

			List<Mono<List<ByteBuffer>>> intersectingSets = new ArrayList<>();

			for (int i = 1; i < command.getKeys().size(); i++) {
				intersectingSets.add(cmd.smembers(command.getKeys().get(i)).distinct().collectList());
			}

			Flux<List<ByteBuffer>> result = Flux.zip(sourceSet, Flux.merge(intersectingSets).collectList(),
					(source, intersectings) -> {

						for (List<ByteBuffer> intersecting : intersectings) {
							source.removeAll(intersecting);
						}

				return source;
			});

			return Mono.just(new CommandResponse<>(command, result.concatMap(v -> Flux.fromStream(v.stream()))));

		}));
	}

	@Override
	public Flux<NumericResponse<SDiffStoreCommand, Long>> sDiffStore(Publisher<SDiffStoreCommand> commands) {

		return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {

			Assert.notNull(command.getKeys(), "Source keys must not be null");
			Assert.notNull(command.getKey(), "Destination key must not be null");

			List<ByteBuffer> keys = new ArrayList<>(command.getKeys());
			keys.add(command.getKey());

			if (ClusterSlotHashUtil.isSameSlotForAllKeys(keys)) {
				return super.sDiffStore(Mono.just(command));
			}

			return sDiff(Mono.just(SDiffCommand.keys(command.getKeys()))).next().flatMap(values -> {

				Mono<Long> result = values.getOutput().collectList().flatMap(it -> {

					ByteBuffer[] members = it.toArray(new ByteBuffer[0]);
					return cmd.sadd(command.getKey(), members);
				});

				return result.map(value -> new NumericResponse<>(command, value));
			});
		}));
	}

	@Override
	public Flux<BooleanResponse<SMoveCommand>> sMove(Publisher<SMoveCommand> commands) {

		return getConnection().execute(cmd -> Flux.from(commands).concatMap(command -> {

			Assert.notNull(command.getKey(), "Source key must not be null");
			Assert.notNull(command.getDestination(), "Destination key must not be null");

			if (ClusterSlotHashUtil.isSameSlotForAllKeys(command.getKey(), command.getDestination())) {
				return super.sMove(Mono.just(command));
			}

			Mono<Boolean> result = cmd.exists(command.getKey())
					.flatMap(nrKeys -> nrKeys == 0 ? Mono.empty() : cmd.sismember(command.getKey(), command.getValue()))
					.flatMap(exists -> {

						if (!exists) {
							return Mono.just(Boolean.FALSE);
						}
						return cmd.sismember(command.getDestination(), command.getValue()).flatMap(existsInTarget -> {

							Mono<Boolean> tmp = cmd.srem(command.getKey(), command.getValue()).map(nrRemoved -> nrRemoved > 0);
							if (!existsInTarget) {
								return tmp.flatMap(removed -> cmd.sadd(command.getDestination(), command.getValue())
										.map(LettuceConverters::toBoolean));
							}
							return tmp;
						});

					});

			return result.defaultIfEmpty(Boolean.FALSE).map(value -> new BooleanResponse<>(command, value));
		}));
	}
}

相关信息

spring-data-redis 源码目录

相关文章

spring-data-redis ClusterConnectionProvider 源码

spring-data-redis DefaultLettuceClientConfiguration 源码

spring-data-redis DefaultLettucePoolingClientConfiguration 源码

spring-data-redis LettuceByteBufferPubSubListenerWrapper 源码

spring-data-redis LettuceClientConfiguration 源码

spring-data-redis LettuceClusterConnection 源码

spring-data-redis LettuceClusterGeoCommands 源码

spring-data-redis LettuceClusterHashCommands 源码

spring-data-redis LettuceClusterHyperLogLogCommands 源码

spring-data-redis LettuceClusterKeyCommands 源码

0  赞