spring Jackson2Tokenizer 源码

  • 2022-08-08
  • 浏览 (427)

spring Jackson2Tokenizer 代码

文件路径:/spring-web/src/main/java/org/springframework/http/codec/json/Jackson2Tokenizer.java

/*
 * Copyright 2002-2022 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.http.codec.json;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.async.ByteArrayFeeder;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.deser.DefaultDeserializationContext;
import com.fasterxml.jackson.databind.util.TokenBuffer;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;

import org.springframework.core.codec.DecodingException;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferLimitException;
import org.springframework.core.io.buffer.DataBufferUtils;

/**
 * {@link Function} to transform a JSON stream of arbitrary size, byte array
 * chunks into a {@code Flux<TokenBuffer>} where each token buffer is a
 * well-formed JSON object.
 *
 * @author Arjen Poutsma
 * @author Rossen Stoyanchev
 * @author Juergen Hoeller
 * @since 5.0
 */
final class Jackson2Tokenizer {

	private final JsonParser parser;

	private final DeserializationContext deserializationContext;

	private final boolean tokenizeArrayElements;

	private final boolean forceUseOfBigDecimal;

	private final int maxInMemorySize;

	private int objectDepth;

	private int arrayDepth;

	private int byteCount;

	private TokenBuffer tokenBuffer;


	// TODO: change to ByteBufferFeeder when supported by Jackson
	// See https://github.com/FasterXML/jackson-core/issues/478
	private final ByteArrayFeeder inputFeeder;


	private Jackson2Tokenizer(JsonParser parser, DeserializationContext deserializationContext,
			boolean tokenizeArrayElements, boolean forceUseOfBigDecimal, int maxInMemorySize) {

		this.parser = parser;
		this.deserializationContext = deserializationContext;
		this.tokenizeArrayElements = tokenizeArrayElements;
		this.forceUseOfBigDecimal = forceUseOfBigDecimal;
		this.inputFeeder = (ByteArrayFeeder) this.parser.getNonBlockingInputFeeder();
		this.maxInMemorySize = maxInMemorySize;
		this.tokenBuffer = createToken();
	}



	private List<TokenBuffer> tokenize(DataBuffer dataBuffer) {
		int bufferSize = dataBuffer.readableByteCount();
		byte[] bytes = new byte[bufferSize];
		dataBuffer.read(bytes);
		DataBufferUtils.release(dataBuffer);

		try {
			this.inputFeeder.feedInput(bytes, 0, bytes.length);
			List<TokenBuffer> result = parseTokenBufferFlux();
			assertInMemorySize(bufferSize, result);
			return result;
		}
		catch (JsonProcessingException ex) {
			throw new DecodingException("JSON decoding error: " + ex.getOriginalMessage(), ex);
		}
		catch (IOException ex) {
			throw Exceptions.propagate(ex);
		}
	}

	private Flux<TokenBuffer> endOfInput() {
		return Flux.defer(() -> {
			this.inputFeeder.endOfInput();
			try {
				return Flux.fromIterable(parseTokenBufferFlux());
			}
			catch (JsonProcessingException ex) {
				throw new DecodingException("JSON decoding error: " + ex.getOriginalMessage(), ex);
			}
			catch (IOException ex) {
				throw Exceptions.propagate(ex);
			}
		});
	}

	private List<TokenBuffer> parseTokenBufferFlux() throws IOException {
		List<TokenBuffer> result = new ArrayList<>();

		// SPR-16151: Smile data format uses null to separate documents
		boolean previousNull = false;
		while (!this.parser.isClosed()) {
			JsonToken token = this.parser.nextToken();
			if (token == JsonToken.NOT_AVAILABLE ||
					token == null && previousNull) {
				break;
			}
			else if (token == null ) { // !previousNull
				previousNull = true;
				continue;
			}
			else {
				previousNull = false;
			}
			updateDepth(token);
			if (!this.tokenizeArrayElements) {
				processTokenNormal(token, result);
			}
			else {
				processTokenArray(token, result);
			}
		}
		return result;
	}

	private void updateDepth(JsonToken token) {
		switch (token) {
			case START_OBJECT -> this.objectDepth++;
			case END_OBJECT -> this.objectDepth--;
			case START_ARRAY -> this.arrayDepth++;
			case END_ARRAY -> this.arrayDepth--;
		}
	}

	private void processTokenNormal(JsonToken token, List<TokenBuffer> result) throws IOException {
		this.tokenBuffer.copyCurrentEvent(this.parser);

		if ((token.isStructEnd() || token.isScalarValue()) && this.objectDepth == 0 && this.arrayDepth == 0) {
			result.add(this.tokenBuffer);
			this.tokenBuffer = createToken();
		}
	}

	private void processTokenArray(JsonToken token, List<TokenBuffer> result) throws IOException {
		if (!isTopLevelArrayToken(token)) {
			this.tokenBuffer.copyCurrentEvent(this.parser);
		}

		if (this.objectDepth == 0 && (this.arrayDepth == 0 || this.arrayDepth == 1) &&
				(token == JsonToken.END_OBJECT || token.isScalarValue())) {
			result.add(this.tokenBuffer);
			this.tokenBuffer = createToken();
		}
	}

	private TokenBuffer createToken() {
		TokenBuffer tokenBuffer = new TokenBuffer(this.parser, this.deserializationContext);
		tokenBuffer.forceUseOfBigDecimal(this.forceUseOfBigDecimal);
		return tokenBuffer;
	}

	private boolean isTopLevelArrayToken(JsonToken token) {
		return this.objectDepth == 0 && ((token == JsonToken.START_ARRAY && this.arrayDepth == 1) ||
				(token == JsonToken.END_ARRAY && this.arrayDepth == 0));
	}

	private void assertInMemorySize(int currentBufferSize, List<TokenBuffer> result) {
		if (this.maxInMemorySize >= 0) {
			if (!result.isEmpty()) {
				this.byteCount = 0;
			}
			else if (currentBufferSize > Integer.MAX_VALUE - this.byteCount) {
				raiseLimitException();
			}
			else {
				this.byteCount += currentBufferSize;
				if (this.byteCount > this.maxInMemorySize) {
					raiseLimitException();
				}
			}
		}
	}

	private void raiseLimitException() {
		throw new DataBufferLimitException(
				"Exceeded limit on max bytes per JSON object: " + this.maxInMemorySize);
	}


	/**
	 * Tokenize the given {@code Flux<DataBuffer>} into {@code Flux<TokenBuffer>}.
	 * @param dataBuffers the source data buffers
	 * @param jsonFactory the factory to use
	 * @param objectMapper the current mapper instance
	 * @param tokenizeArrays if {@code true} and the "top level" JSON object is
	 * an array, each element is returned individually immediately after it is received
	 * @param forceUseOfBigDecimal if {@code true}, any floating point values encountered
	 * in source will use {@link java.math.BigDecimal}
	 * @param maxInMemorySize maximum memory size
	 * @return the resulting token buffers
	 */
	public static Flux<TokenBuffer> tokenize(Flux<DataBuffer> dataBuffers, JsonFactory jsonFactory,
			ObjectMapper objectMapper, boolean tokenizeArrays, boolean forceUseOfBigDecimal, int maxInMemorySize) {

		try {
			JsonParser parser = jsonFactory.createNonBlockingByteArrayParser();
			DeserializationContext context = objectMapper.getDeserializationContext();
			if (context instanceof DefaultDeserializationContext) {
				context = ((DefaultDeserializationContext) context).createInstance(
						objectMapper.getDeserializationConfig(), parser, objectMapper.getInjectableValues());
			}
			Jackson2Tokenizer tokenizer =
					new Jackson2Tokenizer(parser, context, tokenizeArrays, forceUseOfBigDecimal, maxInMemorySize);
			return dataBuffers.concatMapIterable(tokenizer::tokenize).concatWith(tokenizer.endOfInput());
		}
		catch (IOException ex) {
			return Flux.error(ex);
		}
	}

}

相关信息

spring 源码目录

相关文章

spring AbstractJackson2Decoder 源码

spring AbstractJackson2Encoder 源码

spring Jackson2CodecSupport 源码

spring Jackson2JsonDecoder 源码

spring Jackson2JsonEncoder 源码

spring Jackson2SmileDecoder 源码

spring Jackson2SmileEncoder 源码

spring KotlinSerializationJsonDecoder 源码

spring KotlinSerializationJsonEncoder 源码

spring package-info 源码

0  赞