spring MessageHeaderAccessor 源码
spring MessageHeaderAccessor 代码
文件路径:/spring-messaging/src/main/java/org/springframework/messaging/support/MessageHeaderAccessor.java
/*
* Copyright 2002-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.messaging.support;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.util.Assert;
import org.springframework.util.IdGenerator;
import org.springframework.util.MimeType;
import org.springframework.util.MimeTypeUtils;
import org.springframework.util.ObjectUtils;
import org.springframework.util.PatternMatchUtils;
import org.springframework.util.StringUtils;
/**
* Wrapper around {@link MessageHeaders} that provides extra features such as
* strongly typed accessors for specific headers, the ability to leave headers
* in a {@link Message} mutable, and the option to suppress automatic generation
* of {@link MessageHeaders#ID id} and {@link MessageHeaders#TIMESTAMP
* timesteamp} headers. Subclasses such as {@link NativeMessageHeaderAccessor}
* and others provide support for managing processing vs external source headers
* as well as protocol specific headers.
*
* <p>Below is a workflow to initialize headers via {@code MessageHeaderAccessor},
* or one of its subclasses, then create a {@link Message}, and then re-obtain
* the accessor possibly from a different component:
* <pre class="code">
* // Create a message with headers
* MessageHeaderAccessor accessor = new MessageHeaderAccessor();
* accessor.setHeader("foo", "bar");
* MessageHeaders headers = accessor.getMessageHeaders();
* Message message = MessageBuilder.createMessage("payload", headers);
*
* // Later on
* MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message);
* Assert.notNull(accessor, "No MessageHeaderAccessor");
* </pre>
*
* <p>In order for the above to work, all participating components must use
* {@code MessageHeaders} to create, access, or modify headers, or otherwise
* {@link MessageHeaderAccessor#getAccessor(Message, Class)} will return null.
* Below is a workflow that shows how headers are created and left mutable,
* then modified possibly by a different component, and finally made immutable
* perhaps before the possibility of being accessed on a different thread:
* <pre class="code">
* // Create a message with mutable headers
* MessageHeaderAccessor accessor = new MessageHeaderAccessor();
* accessor.setHeader("foo", "bar");
* accessor.setLeaveMutable(true);
* MessageHeaders headers = accessor.getMessageHeaders();
* Message message = MessageBuilder.createMessage("payload", headers);
*
* // Later on
* MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message);
* if (accessor.isMutable()) {
* // It's mutable, just change the headers
* accessor.setHeader("bar", "baz");
* }
* else {
* // It's not, so get a mutable copy, change and re-create
* accessor = MessageHeaderAccessor.getMutableAccessor(message);
* accessor.setHeader("bar", "baz");
* accessor.setLeaveMutable(true); // leave mutable again or not?
* message = MessageBuilder.createMessage(message.getPayload(), accessor);
* }
*
* // Make the accessor immutable
* MessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message);
* accessor.setImmutable();
* </pre>
*
* @author Rossen Stoyanchev
* @author Juergen Hoeller
* @since 4.0
*/
public class MessageHeaderAccessor {
/**
* The default charset used for headers.
*/
public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
private static final MimeType[] READABLE_MIME_TYPES = new MimeType[] {
MimeTypeUtils.APPLICATION_JSON, MimeTypeUtils.APPLICATION_XML,
new MimeType("text", "*"), new MimeType("application", "*+json"), new MimeType("application", "*+xml")
};
private final MutableMessageHeaders headers;
private boolean leaveMutable = false;
private boolean modified = false;
private boolean enableTimestamp = false;
@Nullable
private IdGenerator idGenerator;
/**
* A constructor to create new headers.
*/
public MessageHeaderAccessor() {
this(null);
}
/**
* A constructor accepting the headers of an existing message to copy.
* @param message a message to copy the headers from, or {@code null} if none
*/
public MessageHeaderAccessor(@Nullable Message<?> message) {
this.headers = new MutableMessageHeaders(message != null ? message.getHeaders() : null);
}
/**
* Build a 'nested' accessor for the given message.
* @param message the message to build a new accessor for
* @return the nested accessor (typically a specific subclass)
*/
protected MessageHeaderAccessor createAccessor(Message<?> message) {
return new MessageHeaderAccessor(message);
}
// Configuration properties
/**
* By default, when {@link #getMessageHeaders()} is called, {@code "this"}
* {@code MessageHeaderAccessor} instance can no longer be used to modify the
* underlying message headers and the returned {@code MessageHeaders} is immutable.
* <p>However when this is set to {@code true}, the returned (underlying)
* {@code MessageHeaders} instance remains mutable. To make further modifications
* continue to use the same accessor instance or re-obtain it via:<br>
* {@link MessageHeaderAccessor#getAccessor(Message, Class)
* MessageHeaderAccessor.getAccessor(Message, Class)}
* <p>When modifications are complete use {@link #setImmutable()} to prevent
* further changes. The intended use case for this mechanism is initialization
* of a Message within a single thread.
* <p>By default this is set to {@code false}.
* @since 4.1
*/
public void setLeaveMutable(boolean leaveMutable) {
Assert.state(this.headers.isMutable(), "Already immutable");
this.leaveMutable = leaveMutable;
}
/**
* By default, when {@link #getMessageHeaders()} is called, {@code "this"}
* {@code MessageHeaderAccessor} instance can no longer be used to modify the
* underlying message headers. However if {@link #setLeaveMutable(boolean)}
* is used, this method is necessary to indicate explicitly when the
* {@code MessageHeaders} instance should no longer be modified.
* @since 4.1
*/
public void setImmutable() {
this.headers.setImmutable();
}
/**
* Whether the underlying headers can still be modified.
* @since 4.1
*/
public boolean isMutable() {
return this.headers.isMutable();
}
/**
* Mark the underlying message headers as modified.
* @param modified typically {@code true}, or {@code false} to reset the flag
* @since 4.1
*/
protected void setModified(boolean modified) {
this.modified = modified;
}
/**
* Check whether the underlying message headers have been marked as modified.
* @return {@code true} if the flag has been set, {@code false} otherwise
*/
public boolean isModified() {
return this.modified;
}
/**
* A package private mechanism to enables the automatic addition of the
* {@link org.springframework.messaging.MessageHeaders#TIMESTAMP} header.
* <p>By default, this property is set to {@code false}.
* @see IdTimestampMessageHeaderInitializer
*/
void setEnableTimestamp(boolean enableTimestamp) {
this.enableTimestamp = enableTimestamp;
}
/**
* A package-private mechanism to configure the IdGenerator strategy to use.
* <p>By default this property is not set in which case the default IdGenerator
* in {@link org.springframework.messaging.MessageHeaders} is used.
* @see IdTimestampMessageHeaderInitializer
*/
void setIdGenerator(IdGenerator idGenerator) {
this.idGenerator = idGenerator;
}
// Accessors for the resulting MessageHeaders
/**
* Return the underlying {@code MessageHeaders} instance.
* <p>Unless {@link #setLeaveMutable(boolean)} was set to {@code true}, after
* this call, the headers are immutable and this accessor can no longer
* modify them.
* <p>This method always returns the same {@code MessageHeaders} instance if
* invoked multiples times. To obtain a copy of the underlying headers, use
* {@link #toMessageHeaders()} or {@link #toMap()} instead.
* @since 4.1
*/
public MessageHeaders getMessageHeaders() {
if (!this.leaveMutable) {
setImmutable();
}
return this.headers;
}
/**
* Return a copy of the underlying header values as a {@link MessageHeaders} object.
* <p>This method can be invoked many times, with modifications in between
* where each new call returns a fresh copy of the current header values.
* @since 4.1
*/
public MessageHeaders toMessageHeaders() {
return new MessageHeaders(this.headers);
}
/**
* Return a copy of the underlying header values as a plain {@link Map} object.
* <p>This method can be invoked many times, with modifications in between
* where each new call returns a fresh copy of the current header values.
*/
public Map<String, Object> toMap() {
return new HashMap<>(this.headers);
}
// Generic header accessors
/**
* Retrieve the value for the header with the given name.
* @param headerName the name of the header
* @return the associated value, or {@code null} if none found
*/
@Nullable
public Object getHeader(String headerName) {
return this.headers.get(headerName);
}
/**
* Set the value for the given header name.
* <p>If the provided value is {@code null}, the header will be removed.
*/
public void setHeader(String name, @Nullable Object value) {
if (isReadOnly(name)) {
throw new IllegalArgumentException("'" + name + "' header is read-only");
}
verifyType(name, value);
if (value != null) {
// Modify header if necessary
if (!ObjectUtils.nullSafeEquals(value, getHeader(name))) {
this.modified = true;
this.headers.getRawHeaders().put(name, value);
}
}
else {
// Remove header if available
if (this.headers.containsKey(name)) {
this.modified = true;
this.headers.getRawHeaders().remove(name);
}
}
}
protected void verifyType(@Nullable String headerName, @Nullable Object headerValue) {
if (headerName != null && headerValue != null) {
if (MessageHeaders.ERROR_CHANNEL.equals(headerName) ||
MessageHeaders.REPLY_CHANNEL.endsWith(headerName)) {
if (!(headerValue instanceof MessageChannel || headerValue instanceof String)) {
throw new IllegalArgumentException(
"'" + headerName + "' header value must be a MessageChannel or String");
}
}
}
}
/**
* Set the value for the given header name only if the header name is not
* already associated with a value.
*/
public void setHeaderIfAbsent(String name, Object value) {
if (getHeader(name) == null) {
setHeader(name, value);
}
}
/**
* Remove the value for the given header name.
*/
public void removeHeader(String headerName) {
if (StringUtils.hasLength(headerName) && !isReadOnly(headerName)) {
setHeader(headerName, null);
}
}
/**
* Removes all headers provided via array of 'headerPatterns'.
* <p>As the name suggests, array may contain simple matching patterns for header
* names. Supported pattern styles are: "xxx*", "*xxx", "*xxx*" and "xxx*yyy".
*/
public void removeHeaders(String... headerPatterns) {
List<String> headersToRemove = new ArrayList<>();
for (String pattern : headerPatterns) {
if (StringUtils.hasLength(pattern)){
if (pattern.contains("*")){
headersToRemove.addAll(getMatchingHeaderNames(pattern, this.headers));
}
else {
headersToRemove.add(pattern);
}
}
}
for (String headerToRemove : headersToRemove) {
removeHeader(headerToRemove);
}
}
private List<String> getMatchingHeaderNames(String pattern, @Nullable Map<String, Object> headers) {
if (headers == null) {
return Collections.emptyList();
}
List<String> matchingHeaderNames = new ArrayList<>();
for (String key : headers.keySet()) {
if (PatternMatchUtils.simpleMatch(pattern, key)) {
matchingHeaderNames.add(key);
}
}
return matchingHeaderNames;
}
/**
* Copy the name-value pairs from the provided Map.
* <p>This operation will overwrite any existing values. Use
* {@link #copyHeadersIfAbsent(Map)} to avoid overwriting values.
*/
public void copyHeaders(@Nullable Map<String, ?> headersToCopy) {
if (headersToCopy == null || this.headers == headersToCopy) {
return;
}
headersToCopy.forEach((key, value) -> {
if (!isReadOnly(key)) {
setHeader(key, value);
}
});
}
/**
* Copy the name-value pairs from the provided Map.
* <p>This operation will <em>not</em> overwrite any existing values.
*/
public void copyHeadersIfAbsent(@Nullable Map<String, ?> headersToCopy) {
if (headersToCopy == null || this.headers == headersToCopy) {
return;
}
headersToCopy.forEach((key, value) -> {
if (!isReadOnly(key)) {
setHeaderIfAbsent(key, value);
}
});
}
protected boolean isReadOnly(String headerName) {
return (MessageHeaders.ID.equals(headerName) || MessageHeaders.TIMESTAMP.equals(headerName));
}
// Specific header accessors
@Nullable
public UUID getId() {
Object value = getHeader(MessageHeaders.ID);
if (value == null) {
return null;
}
return (value instanceof UUID ? (UUID) value : UUID.fromString(value.toString()));
}
@Nullable
public Long getTimestamp() {
Object value = getHeader(MessageHeaders.TIMESTAMP);
if (value == null) {
return null;
}
return (value instanceof Long ? (Long) value : Long.parseLong(value.toString()));
}
public void setContentType(MimeType contentType) {
setHeader(MessageHeaders.CONTENT_TYPE, contentType);
}
@Nullable
public MimeType getContentType() {
Object value = getHeader(MessageHeaders.CONTENT_TYPE);
if (value == null) {
return null;
}
return (value instanceof MimeType ? (MimeType) value : MimeType.valueOf(value.toString()));
}
private Charset getCharset() {
MimeType contentType = getContentType();
Charset charset = (contentType != null ? contentType.getCharset() : null);
return (charset != null ? charset : DEFAULT_CHARSET);
}
public void setReplyChannelName(String replyChannelName) {
setHeader(MessageHeaders.REPLY_CHANNEL, replyChannelName);
}
public void setReplyChannel(MessageChannel replyChannel) {
setHeader(MessageHeaders.REPLY_CHANNEL, replyChannel);
}
@Nullable
public Object getReplyChannel() {
return getHeader(MessageHeaders.REPLY_CHANNEL);
}
public void setErrorChannelName(String errorChannelName) {
setHeader(MessageHeaders.ERROR_CHANNEL, errorChannelName);
}
public void setErrorChannel(MessageChannel errorChannel) {
setHeader(MessageHeaders.ERROR_CHANNEL, errorChannel);
}
@Nullable
public Object getErrorChannel() {
return getHeader(MessageHeaders.ERROR_CHANNEL);
}
// Log message stuff
/**
* Return a concise message for logging purposes.
* @param payload the payload that corresponds to the headers.
* @return the message
*/
public String getShortLogMessage(Object payload) {
return "headers=" + this.headers.toString() + getShortPayloadLogMessage(payload);
}
/**
* Return a more detailed message for logging purposes.
* @param payload the payload that corresponds to the headers.
* @return the message
*/
public String getDetailedLogMessage(@Nullable Object payload) {
return "headers=" + this.headers.toString() + getDetailedPayloadLogMessage(payload);
}
protected String getShortPayloadLogMessage(Object payload) {
if (payload instanceof String payloadText) {
return (payloadText.length() < 80) ?
" payload=" + payloadText :
" payload=" + payloadText.substring(0, 80) + "...(truncated)";
}
else if (payload instanceof byte[] bytes) {
if (isReadableContentType()) {
return (bytes.length < 80) ?
" payload=" + new String(bytes, getCharset()) :
" payload=" + new String(Arrays.copyOf(bytes, 80), getCharset()) + "...(truncated)";
}
else {
return " payload=byte[" + bytes.length + "]";
}
}
else {
String payloadText = payload.toString();
return (payloadText.length() < 80) ?
" payload=" + payloadText :
" payload=" + ObjectUtils.identityToString(payload);
}
}
protected String getDetailedPayloadLogMessage(@Nullable Object payload) {
if (payload instanceof String) {
return " payload=" + payload;
}
else if (payload instanceof byte[] bytes) {
if (isReadableContentType()) {
return " payload=" + new String(bytes, getCharset());
}
else {
return " payload=byte[" + bytes.length + "]";
}
}
else {
return " payload=" + payload;
}
}
protected boolean isReadableContentType() {
MimeType contentType = getContentType();
for (MimeType mimeType : READABLE_MIME_TYPES) {
if (mimeType.includes(contentType)) {
return true;
}
}
return false;
}
@Override
public String toString() {
return getClass().getSimpleName() + " [headers=" + this.headers + "]";
}
// Static factory methods
/**
* Return the original {@code MessageHeaderAccessor} used to create the headers
* of the given {@code Message}, or {@code null} if that's not available or if
* its type does not match the required type.
* <p>This is for cases where the existence of an accessor is strongly expected
* (followed up with an assertion) or where an accessor will be created otherwise.
* @param message the message to get an accessor for
* @return an accessor instance of the specified type, or {@code null} if none
* @since 5.1.19
*/
@Nullable
public static MessageHeaderAccessor getAccessor(Message<?> message) {
return getAccessor(message.getHeaders(), null);
}
/**
* Return the original {@code MessageHeaderAccessor} used to create the headers
* of the given {@code Message}, or {@code null} if that's not available or if
* its type does not match the required type.
* <p>This is for cases where the existence of an accessor is strongly expected
* (followed up with an assertion) or where an accessor will be created otherwise.
* @param message the message to get an accessor for
* @param requiredType the required accessor type (or {@code null} for any)
* @return an accessor instance of the specified type, or {@code null} if none
* @since 4.1
*/
@Nullable
public static <T extends MessageHeaderAccessor> T getAccessor(Message<?> message, @Nullable Class<T> requiredType) {
return getAccessor(message.getHeaders(), requiredType);
}
/**
* A variation of {@link #getAccessor(org.springframework.messaging.Message, Class)}
* with a {@code MessageHeaders} instance instead of a {@code Message}.
* <p>This is for cases when a full message may not have been created yet.
* @param messageHeaders the message headers to get an accessor for
* @param requiredType the required accessor type (or {@code null} for any)
* @return an accessor instance of the specified type, or {@code null} if none
* @since 4.1
*/
@SuppressWarnings("unchecked")
@Nullable
public static <T extends MessageHeaderAccessor> T getAccessor(
MessageHeaders messageHeaders, @Nullable Class<T> requiredType) {
if (messageHeaders instanceof MutableMessageHeaders mutableHeaders) {
MessageHeaderAccessor headerAccessor = mutableHeaders.getAccessor();
if (requiredType == null || requiredType.isInstance(headerAccessor)) {
return (T) headerAccessor;
}
}
return null;
}
/**
* Return a mutable {@code MessageHeaderAccessor} for the given message attempting
* to match the type of accessor used to create the message headers, or otherwise
* wrapping the message with a {@code MessageHeaderAccessor} instance.
* <p>This is for cases where a header needs to be updated in generic code
* while preserving the accessor type for downstream processing.
* @return an accessor of the required type (never {@code null})
* @since 4.1
*/
public static MessageHeaderAccessor getMutableAccessor(Message<?> message) {
if (message.getHeaders() instanceof MutableMessageHeaders) {
MutableMessageHeaders mutableHeaders = (MutableMessageHeaders) message.getHeaders();
MessageHeaderAccessor accessor = mutableHeaders.getAccessor();
return (accessor.isMutable() ? accessor : accessor.createAccessor(message));
}
return new MessageHeaderAccessor(message);
}
/**
* Extension of {@link MessageHeaders} that helps to preserve the link to
* the outer {@link MessageHeaderAccessor} instance that created it as well
* as keeps track of whether headers are still mutable.
*/
@SuppressWarnings("serial")
private class MutableMessageHeaders extends MessageHeaders {
private boolean mutable = true;
public MutableMessageHeaders(@Nullable Map<String, Object> headers) {
super(headers, MessageHeaders.ID_VALUE_NONE, -1L);
}
@Override
public Map<String, Object> getRawHeaders() {
Assert.state(this.mutable, "Already immutable");
return super.getRawHeaders();
}
public void setImmutable() {
if (!this.mutable) {
return;
}
if (getId() == null) {
IdGenerator idGenerator = (MessageHeaderAccessor.this.idGenerator != null ?
MessageHeaderAccessor.this.idGenerator : MessageHeaders.getIdGenerator());
UUID id = idGenerator.generateId();
if (id != MessageHeaders.ID_VALUE_NONE) {
getRawHeaders().put(ID, id);
}
}
if (getTimestamp() == null) {
if (MessageHeaderAccessor.this.enableTimestamp) {
getRawHeaders().put(TIMESTAMP, System.currentTimeMillis());
}
}
this.mutable = false;
}
public boolean isMutable() {
return this.mutable;
}
public MessageHeaderAccessor getAccessor() {
return MessageHeaderAccessor.this;
}
protected Object writeReplace() {
// Serialize as regular MessageHeaders (without MessageHeaderAccessor reference)
return new MessageHeaders(this);
}
}
}
相关信息
相关文章
spring AbstractHeaderMapper 源码
spring AbstractMessageChannel 源码
spring AbstractSubscribableChannel 源码
spring ExecutorChannelInterceptor 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
7、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦