spring ResponseBodyEmitterReturnValueHandler 源码
spring ResponseBodyEmitterReturnValueHandler 代码
文件路径:/spring-webmvc/src/main/java/org/springframework/web/servlet/mvc/method/annotation/ResponseBodyEmitterReturnValueHandler.java
/*
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.web.servlet.mvc.method.annotation;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import jakarta.servlet.ServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.springframework.core.MethodParameter;
import org.springframework.core.ReactiveAdapterRegistry;
import org.springframework.core.ResolvableType;
import org.springframework.core.task.TaskExecutor;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.http.converter.HttpMessageConverter;
import org.springframework.http.converter.StringHttpMessageConverter;
import org.springframework.http.server.DelegatingServerHttpResponse;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.http.server.ServletServerHttpResponse;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.web.accept.ContentNegotiationManager;
import org.springframework.web.context.request.NativeWebRequest;
import org.springframework.web.context.request.async.DeferredResult;
import org.springframework.web.context.request.async.WebAsyncUtils;
import org.springframework.web.filter.ShallowEtagHeaderFilter;
import org.springframework.web.method.support.HandlerMethodReturnValueHandler;
import org.springframework.web.method.support.ModelAndViewContainer;
/**
* Handler for return values of type {@link ResponseBodyEmitter} and subclasses
* such as {@link SseEmitter} including the same types wrapped with
* {@link ResponseEntity}.
*
* <p>As of 5.0 also supports reactive return value types for any reactive
* library with registered adapters in {@link ReactiveAdapterRegistry}.
*
* @author Rossen Stoyanchev
* @since 4.2
*/
public class ResponseBodyEmitterReturnValueHandler implements HandlerMethodReturnValueHandler {
private final List<HttpMessageConverter<?>> sseMessageConverters;
private final ReactiveTypeHandler reactiveHandler;
/**
* Simple constructor with reactive type support based on a default instance of
* {@link ReactiveAdapterRegistry},
* {@link org.springframework.core.task.SyncTaskExecutor}, and
* {@link ContentNegotiationManager} with an Accept header strategy.
*/
public ResponseBodyEmitterReturnValueHandler(List<HttpMessageConverter<?>> messageConverters) {
Assert.notEmpty(messageConverters, "HttpMessageConverter List must not be empty");
this.sseMessageConverters = initSseConverters(messageConverters);
this.reactiveHandler = new ReactiveTypeHandler();
}
/**
* Complete constructor with pluggable "reactive" type support.
* @param messageConverters converters to write emitted objects with
* @param registry for reactive return value type support
* @param executor for blocking I/O writes of items emitted from reactive types
* @param manager for detecting streaming media types
* @since 5.0
*/
public ResponseBodyEmitterReturnValueHandler(List<HttpMessageConverter<?>> messageConverters,
ReactiveAdapterRegistry registry, TaskExecutor executor, ContentNegotiationManager manager) {
Assert.notEmpty(messageConverters, "HttpMessageConverter List must not be empty");
this.sseMessageConverters = initSseConverters(messageConverters);
this.reactiveHandler = new ReactiveTypeHandler(registry, executor, manager);
}
private static List<HttpMessageConverter<?>> initSseConverters(List<HttpMessageConverter<?>> converters) {
for (HttpMessageConverter<?> converter : converters) {
if (converter.canWrite(String.class, MediaType.TEXT_PLAIN)) {
return converters;
}
}
List<HttpMessageConverter<?>> result = new ArrayList<>(converters.size() + 1);
result.add(new StringHttpMessageConverter(StandardCharsets.UTF_8));
result.addAll(converters);
return result;
}
@Override
public boolean supportsReturnType(MethodParameter returnType) {
Class<?> bodyType = ResponseEntity.class.isAssignableFrom(returnType.getParameterType()) ?
ResolvableType.forMethodParameter(returnType).getGeneric().resolve() :
returnType.getParameterType();
return (bodyType != null && (ResponseBodyEmitter.class.isAssignableFrom(bodyType) ||
this.reactiveHandler.isReactiveType(bodyType)));
}
@Override
@SuppressWarnings("resource")
public void handleReturnValue(@Nullable Object returnValue, MethodParameter returnType,
ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {
if (returnValue == null) {
mavContainer.setRequestHandled(true);
return;
}
HttpServletResponse response = webRequest.getNativeResponse(HttpServletResponse.class);
Assert.state(response != null, "No HttpServletResponse");
ServerHttpResponse outputMessage = new ServletServerHttpResponse(response);
if (returnValue instanceof ResponseEntity<?> responseEntity) {
response.setStatus(responseEntity.getStatusCode().value());
outputMessage.getHeaders().putAll(responseEntity.getHeaders());
returnValue = responseEntity.getBody();
returnType = returnType.nested();
if (returnValue == null) {
mavContainer.setRequestHandled(true);
outputMessage.flush();
return;
}
}
ServletRequest request = webRequest.getNativeRequest(ServletRequest.class);
Assert.state(request != null, "No ServletRequest");
ResponseBodyEmitter emitter;
if (returnValue instanceof ResponseBodyEmitter) {
emitter = (ResponseBodyEmitter) returnValue;
}
else {
emitter = this.reactiveHandler.handleValue(returnValue, returnType, mavContainer, webRequest);
if (emitter == null) {
// Not streaming: write headers without committing response..
outputMessage.getHeaders().forEach((headerName, headerValues) -> {
for (String headerValue : headerValues) {
response.addHeader(headerName, headerValue);
}
});
return;
}
}
emitter.extendResponse(outputMessage);
// At this point we know we're streaming..
ShallowEtagHeaderFilter.disableContentCaching(request);
// Wrap the response to ignore further header changes
// Headers will be flushed at the first write
outputMessage = new StreamingServletServerHttpResponse(outputMessage);
HttpMessageConvertingHandler handler;
try {
DeferredResult<?> deferredResult = new DeferredResult<>(emitter.getTimeout());
WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(deferredResult, mavContainer);
handler = new HttpMessageConvertingHandler(outputMessage, deferredResult);
}
catch (Throwable ex) {
emitter.initializeWithError(ex);
throw ex;
}
emitter.initialize(handler);
}
/**
* ResponseBodyEmitter.Handler that writes with HttpMessageConverter's.
*/
private class HttpMessageConvertingHandler implements ResponseBodyEmitter.Handler {
private final ServerHttpResponse outputMessage;
private final DeferredResult<?> deferredResult;
public HttpMessageConvertingHandler(ServerHttpResponse outputMessage, DeferredResult<?> deferredResult) {
this.outputMessage = outputMessage;
this.deferredResult = deferredResult;
}
@Override
public void send(Object data, @Nullable MediaType mediaType) throws IOException {
sendInternal(data, mediaType);
}
@SuppressWarnings("unchecked")
private <T> void sendInternal(T data, @Nullable MediaType mediaType) throws IOException {
for (HttpMessageConverter<?> converter : ResponseBodyEmitterReturnValueHandler.this.sseMessageConverters) {
if (converter.canWrite(data.getClass(), mediaType)) {
((HttpMessageConverter<T>) converter).write(data, mediaType, this.outputMessage);
this.outputMessage.flush();
return;
}
}
throw new IllegalArgumentException("No suitable converter for " + data.getClass());
}
@Override
public void complete() {
try {
this.outputMessage.flush();
this.deferredResult.setResult(null);
}
catch (IOException ex) {
this.deferredResult.setErrorResult(ex);
}
}
@Override
public void completeWithError(Throwable failure) {
this.deferredResult.setErrorResult(failure);
}
@Override
public void onTimeout(Runnable callback) {
this.deferredResult.onTimeout(callback);
}
@Override
public void onError(Consumer<Throwable> callback) {
this.deferredResult.onError(callback);
}
@Override
public void onCompletion(Runnable callback) {
this.deferredResult.onCompletion(callback);
}
}
/**
* Wrap to silently ignore header changes HttpMessageConverter's that would
* otherwise cause HttpHeaders to raise exceptions.
*/
private static class StreamingServletServerHttpResponse extends DelegatingServerHttpResponse {
private final HttpHeaders mutableHeaders = new HttpHeaders();
public StreamingServletServerHttpResponse(ServerHttpResponse delegate) {
super(delegate);
this.mutableHeaders.putAll(delegate.getHeaders());
}
@Override
public HttpHeaders getHeaders() {
return this.mutableHeaders;
}
}
}
相关信息
相关文章
spring AbstractMappingJacksonResponseBodyAdvice 源码
spring AbstractMessageConverterMethodArgumentResolver 源码
spring AbstractMessageConverterMethodProcessor 源码
spring AsyncTaskMethodReturnValueHandler 源码
spring CallableMethodReturnValueHandler 源码
spring ContinuationHandlerMethodArgumentResolver 源码
spring DeferredResultMethodReturnValueHandler 源码
spring ExceptionHandlerExceptionResolver 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦