spring OrderedMessageSendingIntegrationTests 源码

  • 2022-08-08
  • 浏览 (356)

spring OrderedMessageSendingIntegrationTests 代码

文件路径:/spring-websocket/src/test/java/org/springframework/web/socket/messaging/OrderedMessageSendingIntegrationTests.java

/*
 * Copyright 2002-2020 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.web.socket.messaging;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.simp.broker.OrderedMessageChannelDecorator;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompEncoder;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ExecutorSubscribableChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.BlockingWebSocketSession;
import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator;

import static org.assertj.core.api.Assertions.assertThat;

/**
 * Tests to publish messages to an Executor backed channel wrapped with
 * {@link OrderedMessageChannelDecorator} and handled by
 * {@link StompSubProtocolHandler} delegating to a
 * {@link ConcurrentWebSocketSessionDecorator} wrapped session.
 *
 * <p>The tests verify that:
 * <ul>
 * <li>messages are executed in the same order as they are published.
 * <li>send buffer size and send time limits at the
 * {@link ConcurrentWebSocketSessionDecorator} level are enforced.
 * </ul>
 *
 * <p>The key is for {@link OrderedMessageChannelDecorator} to release the next
 * message when after the current one is queued for sending, and not after it is
 * sent, which may block and cause messages to accumulate in the
 * {@link OrderedMessageChannelDecorator} instead of in
 * {@link ConcurrentWebSocketSessionDecorator} where send limits are enforced.
 *
 * @author Rossen Stoyanchev
 */
public class OrderedMessageSendingIntegrationTests {

	private static final Log logger = LogFactory.getLog(OrderedMessageSendingIntegrationTests.class);

	private static final int MESSAGE_SIZE = new StompEncoder().encode(createMessage(0)).length;


	private BlockingWebSocketSession blockingSession;

	private ExecutorSubscribableChannel subscribableChannel;

	private OrderedMessageChannelDecorator orderedMessageChannel;

	private ThreadPoolTaskExecutor executor;



	@BeforeEach
	public void setup() {
		this.blockingSession = new BlockingWebSocketSession();
		this.blockingSession.setId("1");
		this.blockingSession.setOpen(true);

		this.executor = new ThreadPoolTaskExecutor();
		this.executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2);
		this.executor.setAllowCoreThreadTimeOut(true);
		this.executor.afterPropertiesSet();

		this.subscribableChannel = new ExecutorSubscribableChannel(this.executor);
		OrderedMessageChannelDecorator.configureInterceptor(this.subscribableChannel, true);

		this.orderedMessageChannel = new OrderedMessageChannelDecorator(this.subscribableChannel, logger);
	}

	@AfterEach
	public void tearDown() {
		this.executor.shutdown();
	}

	@Test
	void sendAfterBlockedSend() throws InterruptedException {

		int messageCount = 1000;

		ConcurrentWebSocketSessionDecorator concurrentSessionDecorator =
				new ConcurrentWebSocketSessionDecorator(
						this.blockingSession, 60 * 1000, messageCount * MESSAGE_SIZE);

		TestMessageHandler handler = new TestMessageHandler(concurrentSessionDecorator);
		subscribableChannel.subscribe(handler);

		List<Message<?>> expectedMessages = new ArrayList<>(messageCount);

		// Send one to block
		Message<byte[]> message = createMessage(0);
		expectedMessages.add(message);
		this.orderedMessageChannel.send(message);

		CountDownLatch latch = new CountDownLatch(messageCount);
		handler.setMessageLatch(latch);

		for (int i = 1; i <= messageCount; i++) {
			message = createMessage(i);
			expectedMessages.add(message);
			this.orderedMessageChannel.send(message);
		}

		latch.await(5, TimeUnit.SECONDS);

		assertThat(concurrentSessionDecorator.getTimeSinceSendStarted() > 0).isTrue();
		assertThat(concurrentSessionDecorator.getBufferSize()).isEqualTo((messageCount * MESSAGE_SIZE));
		assertThat(handler.getSavedMessages()).containsExactlyElementsOf(expectedMessages);
		assertThat(blockingSession.isOpen()).isTrue();
	}

	@Test
	void exceedTimeLimit() throws InterruptedException {

		ConcurrentWebSocketSessionDecorator concurrentSessionDecorator =
				new ConcurrentWebSocketSessionDecorator(this.blockingSession, 100, 1024);

		TestMessageHandler messageHandler = new TestMessageHandler(concurrentSessionDecorator);
		subscribableChannel.subscribe(messageHandler);

		// Send one to block
		this.orderedMessageChannel.send(createMessage(0));

		// Exceed send time..
		Thread.sleep(200);

		CountDownLatch messageLatch = new CountDownLatch(1);
		messageHandler.setMessageLatch(messageLatch);

		// Send one more
		this.orderedMessageChannel.send(createMessage(1));

		messageLatch.await(5, TimeUnit.SECONDS);

		assertThat(messageHandler.getSavedException()).hasMessageMatching(
				"Send time [\\d]+ \\(ms\\) for session '1' exceeded the allowed limit 100");
	}

	@Test
	void exceedBufferSizeLimit() throws InterruptedException {

		ConcurrentWebSocketSessionDecorator concurrentSessionDecorator =
				new ConcurrentWebSocketSessionDecorator(this.blockingSession, 60 * 1000, 2 * MESSAGE_SIZE);

		TestMessageHandler messageHandler = new TestMessageHandler(concurrentSessionDecorator);
		subscribableChannel.subscribe(messageHandler);

		// Send one to block
		this.orderedMessageChannel.send(createMessage(0));

		int messageCount = 3;
		CountDownLatch messageLatch = new CountDownLatch(messageCount);
		messageHandler.setMessageLatch(messageLatch);

		for (int i = 1; i <= messageCount; i++) {
			this.orderedMessageChannel.send(createMessage(i));
		}

		messageLatch.await(5, TimeUnit.SECONDS);

		assertThat(messageHandler.getSavedException()).hasMessage(
				"Buffer size " + 3 * MESSAGE_SIZE + " bytes for session '1' exceeds the allowed limit " + 2 * MESSAGE_SIZE);
	}

	private static Message<byte[]> createMessage(int index) {
		StompHeaderAccessor accessor = StompHeaderAccessor.create(StompCommand.MESSAGE);
		accessor.setHeader("index", index);
		accessor.setSubscriptionId("1");
		accessor.setLeaveMutable(true);
		byte[] bytes = "payload".getBytes(StandardCharsets.UTF_8);
		return MessageBuilder.createMessage(bytes, accessor.getMessageHeaders());

	}


	private static class TestMessageHandler implements MessageHandler {

		private final StompSubProtocolHandler subProtocolHandler = new StompSubProtocolHandler();

		private final WebSocketSession session;

		@Nullable
		private CountDownLatch messageLatch;

		private Queue<Message<?>> messages = new LinkedBlockingQueue<>();

		private AtomicReference<Exception> exception = new AtomicReference<>();


		public TestMessageHandler(WebSocketSession session) {
			this.session = session;
		}

		public void setMessageLatch(CountDownLatch latch) {
			this.messageLatch = latch;
		}

		public Collection<Message<?>> getSavedMessages() {
			return this.messages;
		}

		public Exception getSavedException() {
			return this.exception.get();
		}

		@Override
		public void handleMessage(Message<?> message) throws MessagingException {
			this.messages.add(message);
			try {
				this.subProtocolHandler.handleMessageToClient(this.session, message);
			}
			catch (Exception ex) {
				this.exception.set(ex);
			}
			if (this.messageLatch != null) {
				this.messageLatch.countDown();
			}
		}
	}
}

相关信息

spring 源码目录

相关文章

spring DefaultSimpUserRegistryTests 源码

spring StompSubProtocolErrorHandlerTests 源码

spring StompSubProtocolHandlerTests 源码

spring StompTextMessageBuilder 源码

spring StompWebSocketIntegrationTests 源码

spring SubProtocolWebSocketHandlerTests 源码

spring WebSocketAnnotationMethodMessageHandlerTests 源码

spring WebSocketStompClientIntegrationTests 源码

spring WebSocketStompClientTests 源码

0  赞