spring ReactiveTypeHandlerTests 源码
spring ReactiveTypeHandlerTests 代码
文件路径:/spring-webmvc/src/test/java/org/springframework/web/servlet/mvc/method/annotation/ReactiveTypeHandlerTests.java
/*
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.servlet.mvc.method.annotation;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.core.SingleEmitter;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import org.springframework.core.MethodParameter;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.core.ResolvableType;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.http.MediaType;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.http.server.ServletServerHttpResponse;
import org.springframework.web.accept.ContentNegotiationManager;
import org.springframework.web.accept.ContentNegotiationManagerFactoryBean;
import org.springframework.web.context.request.NativeWebRequest;
import org.springframework.web.context.request.ServletWebRequest;
import org.springframework.web.context.request.async.AsyncWebRequest;
import org.springframework.web.context.request.async.StandardServletAsyncWebRequest;
import org.springframework.web.context.request.async.WebAsyncUtils;
import org.springframework.web.method.support.ModelAndViewContainer;
import org.springframework.web.servlet.HandlerMapping;
import org.springframework.web.testfixture.servlet.MockHttpServletRequest;
import org.springframework.web.testfixture.servlet.MockHttpServletResponse;
import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.core.ResolvableType.forClass;
import static org.springframework.web.testfixture.method.ResolvableMethod.on;
/**
* Unit tests for {@link ReactiveTypeHandler}.
* @author Rossen Stoyanchev
*/
public class ReactiveTypeHandlerTests {
private ReactiveTypeHandler handler;
private MockHttpServletRequest servletRequest;
private MockHttpServletResponse servletResponse;
private NativeWebRequest webRequest;
@BeforeEach
public void setup() throws Exception {
ContentNegotiationManagerFactoryBean factoryBean = new ContentNegotiationManagerFactoryBean();
factoryBean.afterPropertiesSet();
ContentNegotiationManager manager = factoryBean.getObject();
ReactiveAdapterRegistry adapterRegistry = ReactiveAdapterRegistry.getSharedInstance();
this.handler = new ReactiveTypeHandler(adapterRegistry, new SyncTaskExecutor(), manager);
resetRequest();
}
private void resetRequest() {
this.servletRequest = new MockHttpServletRequest();
this.servletResponse = new MockHttpServletResponse();
this.webRequest = new ServletWebRequest(this.servletRequest, this.servletResponse);
AsyncWebRequest webRequest = new StandardServletAsyncWebRequest(this.servletRequest, this.servletResponse);
WebAsyncUtils.getAsyncManager(this.webRequest).setAsyncWebRequest(webRequest);
this.servletRequest.setAsyncSupported(true);
}
@Test
public void supportsType() throws Exception {
assertThat(this.handler.isReactiveType(Mono.class)).isTrue();
assertThat(this.handler.isReactiveType(Single.class)).isTrue();
}
@Test
public void doesNotSupportType() throws Exception {
assertThat(this.handler.isReactiveType(String.class)).isFalse();
}
@Test
public void deferredResultSubscriberWithOneValue() throws Exception {
// Mono
Sinks.One<String> sink = Sinks.one();
testDeferredResultSubscriber(
sink.asMono(), Mono.class, forClass(String.class),
() -> sink.emitValue("foo", Sinks.EmitFailureHandler.FAIL_FAST),
"foo");
// Mono empty
Sinks.One<String> emptySink = Sinks.one();
testDeferredResultSubscriber(
emptySink.asMono(), Mono.class, forClass(String.class),
() -> emptySink.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST),
null);
// RxJava Single
AtomicReference<SingleEmitter<String>> ref2 = new AtomicReference<>();
Single<String> single2 = Single.create(ref2::set);
testDeferredResultSubscriber(single2, Single.class, forClass(String.class),
() -> ref2.get().onSuccess("foo"), "foo");
}
@Test
public void deferredResultSubscriberWithNoValues() throws Exception {
Sinks.One<String> sink = Sinks.one();
testDeferredResultSubscriber(sink.asMono(), Mono.class, forClass(String.class),
() -> sink.emitEmpty(Sinks.EmitFailureHandler.FAIL_FAST),
null);
}
@Test
public void deferredResultSubscriberWithMultipleValues() throws Exception {
// JSON must be preferred for Flux<String> -> List<String> or else we stream
this.servletRequest.addHeader("Accept", "application/json");
Bar bar1 = new Bar("foo");
Bar bar2 = new Bar("bar");
Sinks.Many<Bar> sink = Sinks.many().unicast().onBackpressureBuffer();
testDeferredResultSubscriber(sink.asFlux(), Flux.class, forClass(Bar.class), () -> {
sink.tryEmitNext(bar1);
sink.tryEmitNext(bar2);
sink.tryEmitComplete();
}, Arrays.asList(bar1, bar2));
}
@Test
public void deferredResultSubscriberWithError() throws Exception {
IllegalStateException ex = new IllegalStateException();
// Mono
Sinks.One<String> sink = Sinks.one();
testDeferredResultSubscriber(sink.asMono(), Mono.class, forClass(String.class),
() -> sink.emitError(ex, Sinks.EmitFailureHandler.FAIL_FAST), ex);
// RxJava Single
AtomicReference<SingleEmitter<String>> ref2 = new AtomicReference<>();
Single<String> single2 = Single.create(ref2::set);
testDeferredResultSubscriber(single2, Single.class, forClass(String.class),
() -> ref2.get().onError(ex), ex);
}
@Test
public void mediaTypes() throws Exception {
// Media type from request
this.servletRequest.addHeader("Accept", "text/event-stream");
testSseResponse(true);
// Media type from "produces" attribute
Set<MediaType> types = Collections.singleton(MediaType.TEXT_EVENT_STREAM);
this.servletRequest.setAttribute(HandlerMapping.PRODUCIBLE_MEDIA_TYPES_ATTRIBUTE, types);
testSseResponse(true);
// No media type preferences
testSseResponse(false);
}
private void testSseResponse(boolean expectSseEmitter) throws Exception {
ResponseBodyEmitter emitter = handleValue(Flux.empty(), Flux.class, forClass(String.class));
Object actual = emitter instanceof SseEmitter;
assertThat(actual).isEqualTo(expectSseEmitter);
resetRequest();
}
@Test
public void writeServerSentEvents() throws Exception {
this.servletRequest.addHeader("Accept", "text/event-stream");
Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
SseEmitter sseEmitter = (SseEmitter) handleValue(sink.asFlux(), Flux.class, forClass(String.class));
EmitterHandler emitterHandler = new EmitterHandler();
sseEmitter.initialize(emitterHandler);
sink.tryEmitNext("foo");
sink.tryEmitNext("bar");
sink.tryEmitNext("baz");
sink.tryEmitComplete();
assertThat(emitterHandler.getValuesAsText()).isEqualTo("data:foo\n\ndata:bar\n\ndata:baz\n\n");
}
@Test
public void writeServerSentEventsWithBuilder() throws Exception {
ResolvableType type = ResolvableType.forClassWithGenerics(ServerSentEvent.class, String.class);
Sinks.Many<ServerSentEvent<?>> sink = Sinks.many().unicast().onBackpressureBuffer();
SseEmitter sseEmitter = (SseEmitter) handleValue(sink.asFlux(), Flux.class, type);
EmitterHandler emitterHandler = new EmitterHandler();
sseEmitter.initialize(emitterHandler);
sink.tryEmitNext(ServerSentEvent.builder("foo").id("1").build());
sink.tryEmitNext(ServerSentEvent.builder("bar").id("2").build());
sink.tryEmitNext(ServerSentEvent.builder("baz").id("3").build());
sink.tryEmitComplete();
assertThat(emitterHandler.getValuesAsText()).isEqualTo("id:1\ndata:foo\n\nid:2\ndata:bar\n\nid:3\ndata:baz\n\n");
}
@Test
public void writeStreamJson() throws Exception {
this.servletRequest.addHeader("Accept", "application/x-ndjson");
Sinks.Many<Bar> sink = Sinks.many().unicast().onBackpressureBuffer();
ResponseBodyEmitter emitter = handleValue(sink.asFlux(), Flux.class, forClass(Bar.class));
EmitterHandler emitterHandler = new EmitterHandler();
emitter.initialize(emitterHandler);
ServletServerHttpResponse message = new ServletServerHttpResponse(this.servletResponse);
emitter.extendResponse(message);
Bar bar1 = new Bar("foo");
Bar bar2 = new Bar("bar");
sink.tryEmitNext(bar1);
sink.tryEmitNext(bar2);
sink.tryEmitComplete();
assertThat(message.getHeaders().getContentType().toString()).isEqualTo("application/x-ndjson");
assertThat(emitterHandler.getValues()).isEqualTo(Arrays.asList(bar1, "\n", bar2, "\n"));
}
@Test
public void writeText() throws Exception {
Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
ResponseBodyEmitter emitter = handleValue(sink.asFlux(), Flux.class, forClass(String.class));
EmitterHandler emitterHandler = new EmitterHandler();
emitter.initialize(emitterHandler);
sink.tryEmitNext("The quick");
sink.tryEmitNext(" brown fox jumps over ");
sink.tryEmitNext("the lazy dog");
sink.tryEmitComplete();
assertThat(emitterHandler.getValuesAsText()).isEqualTo("The quick brown fox jumps over the lazy dog");
}
@Test
public void writeFluxOfString() throws Exception {
// Default to "text/plain"
testEmitterContentType("text/plain");
// Same if no concrete media type
this.servletRequest.addHeader("Accept", "text/*");
testEmitterContentType("text/plain");
// Otherwise pick concrete media type
this.servletRequest.addHeader("Accept", "*/*, text/*, text/markdown");
testEmitterContentType("text/markdown");
// Any concrete media type
this.servletRequest.addHeader("Accept", "*/*, text/*, foo/bar");
testEmitterContentType("foo/bar");
// Including json
this.servletRequest.addHeader("Accept", "*/*, text/*, application/json");
testEmitterContentType("application/json");
}
private void testEmitterContentType(String expected) throws Exception {
ServletServerHttpResponse message = new ServletServerHttpResponse(this.servletResponse);
ResponseBodyEmitter emitter = handleValue(Flux.empty(), Flux.class, forClass(String.class));
emitter.extendResponse(message);
assertThat(message.getHeaders().getContentType().toString()).isEqualTo(expected);
resetRequest();
}
private void testDeferredResultSubscriber(Object returnValue, Class<?> asyncType,
ResolvableType elementType, Runnable produceTask, Object expected) throws Exception {
ResponseBodyEmitter emitter = handleValue(returnValue, asyncType, elementType);
assertThat(emitter).isNull();
assertThat(this.servletRequest.isAsyncStarted()).isTrue();
assertThat(WebAsyncUtils.getAsyncManager(this.webRequest).hasConcurrentResult()).isFalse();
produceTask.run();
assertThat(WebAsyncUtils.getAsyncManager(this.webRequest).hasConcurrentResult()).isTrue();
assertThat(WebAsyncUtils.getAsyncManager(this.webRequest).getConcurrentResult()).isEqualTo(expected);
resetRequest();
}
private ResponseBodyEmitter handleValue(Object returnValue, Class<?> asyncType,
ResolvableType genericType) throws Exception {
ModelAndViewContainer mavContainer = new ModelAndViewContainer();
MethodParameter returnType = on(TestController.class).resolveReturnType(asyncType, genericType);
return this.handler.handleValue(returnValue, returnType, mavContainer, this.webRequest);
}
@SuppressWarnings("unused")
static class TestController {
String handleString() { return null; }
Mono<String> handleMono() { return null; }
Single<String> handleSingle() { return null; }
Flux<Bar> handleFlux() { return null; }
Flux<String> handleFluxString() { return null; }
Flux<ServerSentEvent<String>> handleFluxSseEventBuilder() { return null; }
}
private static class EmitterHandler implements ResponseBodyEmitter.Handler {
private final List<Object> values = new ArrayList<>();
public List<?> getValues() {
return this.values;
}
public String getValuesAsText() {
return this.values.stream().map(Object::toString).collect(Collectors.joining());
}
@Override
public void send(Object data, MediaType mediaType) throws IOException {
this.values.add(data);
}
@Override
public void complete() {
}
@Override
public void completeWithError(Throwable failure) {
}
@Override
public void onTimeout(Runnable callback) {
}
@Override
public void onError(Consumer<Throwable> callback) {
}
@Override
public void onCompletion(Runnable callback) {
}
}
private static class Bar {
private final String value;
public Bar(String value) {
this.value = value;
}
@SuppressWarnings("unused")
public String getValue() {
return this.value;
}
}
}
相关信息
相关文章
spring AbstractRequestAttributesArgumentResolverTests 源码
spring AbstractServletHandlerMethodTests 源码
spring DeferredResultReturnValueHandlerTests 源码
spring ExceptionHandlerExceptionResolverTests 源码
spring ExtendedServletRequestDataBinderTests 源码
spring HandlerMethodAnnotationDetectionTests 源码
spring HttpEntityMethodProcessorMockTests 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦