spring JettyXhrTransport 源码

  • 2022-08-08
  • 浏览 (328)

spring JettyXhrTransport 代码

文件路径:/spring-websocket/src/main/java/org/springframework/web/socket/sockjs/client/JettyXhrTransport.java

/*
 * Copyright 2002-2022 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.web.socket.sockjs.client;

import java.io.ByteArrayOutputStream;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Enumeration;
import java.util.concurrent.CompletableFuture;

import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.util.StringRequestContent;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpMethod;

import org.springframework.context.Lifecycle;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatusCode;
import org.springframework.http.ResponseEntity;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.StreamUtils;
import org.springframework.web.client.HttpServerErrorException;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.sockjs.SockJsException;
import org.springframework.web.socket.sockjs.SockJsTransportFailureException;
import org.springframework.web.socket.sockjs.frame.SockJsFrame;

/**
 * An XHR transport based on Jetty's {@link org.eclipse.jetty.client.HttpClient}.
 *
 * <p>When used for testing purposes (e.g. load testing) the {@code HttpClient}
 * properties must be set to allow a larger than usual number of connections and
 * threads. For example:
 *
 * <pre class="code">
 * HttpClient httpClient = new HttpClient();
 * httpClient.setMaxConnectionsPerDestination(1000);
 * httpClient.setExecutor(new QueuedThreadPool(500));
 * </pre>
 *
 * @author Rossen Stoyanchev
 * @since 4.1
 */
public class JettyXhrTransport extends AbstractXhrTransport implements Lifecycle {

	private final HttpClient httpClient;


	public JettyXhrTransport(HttpClient httpClient) {
		Assert.notNull(httpClient, "'httpClient' is required");
		this.httpClient = httpClient;
	}


	public HttpClient getHttpClient() {
		return this.httpClient;
	}

	@Override
	public void start() {
		try {
			if (!this.httpClient.isRunning()) {
				this.httpClient.start();
			}
		}
		catch (Exception ex) {
			throw new SockJsException("Failed to start JettyXhrTransport", ex);
		}
	}

	@Override
	public void stop() {
		try {
			if (this.httpClient.isRunning()) {
				this.httpClient.stop();
			}
		}
		catch (Exception ex) {
			throw new SockJsException("Failed to stop JettyXhrTransport", ex);
		}
	}

	@Override
	public boolean isRunning() {
		return this.httpClient.isRunning();
	}


	@Override
	protected void connectInternal(TransportRequest transportRequest, WebSocketHandler handler,
			URI url, HttpHeaders handshakeHeaders, XhrClientSockJsSession session,
			CompletableFuture<WebSocketSession> connectFuture) {

		HttpHeaders httpHeaders = transportRequest.getHttpRequestHeaders();
		SockJsResponseListener listener = new SockJsResponseListener(url, httpHeaders, session, connectFuture);
		executeReceiveRequest(url, handshakeHeaders, listener);
	}

	private void executeReceiveRequest(URI url, HttpHeaders headers, SockJsResponseListener listener) {
		if (logger.isTraceEnabled()) {
			logger.trace("Starting XHR receive request, url=" + url);
		}
		Request httpRequest = this.httpClient.newRequest(url).method(HttpMethod.POST);
		addHttpHeaders(httpRequest, headers);
		httpRequest.send(listener);
	}

	@Override
	protected ResponseEntity<String> executeInfoRequestInternal(URI infoUrl, HttpHeaders headers) {
		return executeRequest(infoUrl, HttpMethod.GET, headers, null);
	}

	@Override
	public ResponseEntity<String> executeSendRequestInternal(URI url, HttpHeaders headers, TextMessage message) {
		return executeRequest(url, HttpMethod.POST, headers, message.getPayload());
	}

	protected ResponseEntity<String> executeRequest(URI url, HttpMethod method,
			HttpHeaders headers, @Nullable String body) {

		Request httpRequest = this.httpClient.newRequest(url).method(method);
		addHttpHeaders(httpRequest, headers);
		if (body != null) {
			httpRequest.body(new StringRequestContent(body));
		}
		ContentResponse response;
		try {
			response = httpRequest.send();
		}
		catch (Exception ex) {
			throw new SockJsTransportFailureException("Failed to execute request to " + url, ex);
		}
		HttpStatusCode status = HttpStatusCode.valueOf(response.getStatus());
		HttpHeaders responseHeaders = toHttpHeaders(response.getHeaders());
		return (response.getContent() != null ?
				new ResponseEntity<>(response.getContentAsString(), responseHeaders, status) :
				new ResponseEntity<>(responseHeaders, status));
	}


	private static void addHttpHeaders(Request request, HttpHeaders headers) {
		request.headers(fields -> {
			headers.forEach((key, values) -> {
				for (String value : values) {
					fields.add(key, value);
				}
			});
		});
	}

	private static HttpHeaders toHttpHeaders(HttpFields httpFields) {
		HttpHeaders responseHeaders = new HttpHeaders();
		Enumeration<String> names = httpFields.getFieldNames();
		while (names.hasMoreElements()) {
			String name = names.nextElement();
			Enumeration<String> values = httpFields.getValues(name);
			while (values.hasMoreElements()) {
				String value = values.nextElement();
				responseHeaders.add(name, value);
			}
		}
		return responseHeaders;
	}


	/**
	 * Jetty client {@link org.eclipse.jetty.client.api.Response.Listener Response
	 * Listener} that splits the body of the response into SockJS frames and
	 * delegates them to the {@link XhrClientSockJsSession}.
	 */
	private class SockJsResponseListener extends Response.Listener.Adapter {

		private final URI transportUrl;

		private final HttpHeaders receiveHeaders;

		private final XhrClientSockJsSession sockJsSession;

		private final CompletableFuture<WebSocketSession> connectFuture;

		private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

		public SockJsResponseListener(URI url, HttpHeaders headers,	XhrClientSockJsSession sockJsSession,
				CompletableFuture<WebSocketSession> connectFuture) {

			this.transportUrl = url;
			this.receiveHeaders = headers;
			this.connectFuture = connectFuture;
			this.sockJsSession = sockJsSession;
		}

		@Override
		public void onBegin(Response response) {
			if (response.getStatus() != 200) {
				HttpStatusCode status = HttpStatusCode.valueOf(response.getStatus());
				response.abort(new HttpServerErrorException(status, "Unexpected XHR receive status"));
			}
		}

		@Override
		public void onHeaders(Response response) {
			if (logger.isTraceEnabled()) {
				// Convert to HttpHeaders to avoid "\n"
				logger.trace("XHR receive headers: " + toHttpHeaders(response.getHeaders()));
			}
		}

		@Override
		public void onContent(Response response, ByteBuffer buffer) {
			while (true) {
				if (this.sockJsSession.isDisconnected()) {
					if (logger.isDebugEnabled()) {
						logger.debug("SockJS sockJsSession closed, closing response.");
					}
					response.abort(new SockJsException("Session closed.", this.sockJsSession.getId(), null));
					return;
				}
				if (buffer.remaining() == 0) {
					break;
				}
				int b = buffer.get();
				if (b == '\n') {
					handleFrame();
				}
				else {
					this.outputStream.write(b);
				}
			}
		}

		private void handleFrame() {
			String content = StreamUtils.copyToString(this.outputStream, SockJsFrame.CHARSET);
			this.outputStream.reset();
			if (logger.isTraceEnabled()) {
				logger.trace("XHR content received: " + content);
			}
			if (!PRELUDE.equals(content)) {
				this.sockJsSession.handleFrame(content);
			}
		}

		@Override
		public void onSuccess(Response response) {
			if (this.outputStream.size() > 0) {
				handleFrame();
			}
			if (logger.isTraceEnabled()) {
				logger.trace("XHR receive request completed.");
			}
			executeReceiveRequest(this.transportUrl, this.receiveHeaders, this);
		}

		@Override
		public void onFailure(Response response, Throwable failure) {
			if (this.connectFuture.completeExceptionally(failure)) {
				return;
			}
			if (this.sockJsSession.isDisconnected()) {
				this.sockJsSession.afterTransportClosed(null);
			}
			else {
				this.sockJsSession.handleTransportError(failure);
				this.sockJsSession.afterTransportClosed(new CloseStatus(1006, failure.getMessage()));
			}
		}
	}

}

相关信息

spring 源码目录

相关文章

spring AbstractClientSockJsSession 源码

spring AbstractXhrTransport 源码

spring DefaultTransportRequest 源码

spring InfoReceiver 源码

spring RestTemplateXhrTransport 源码

spring SockJsClient 源码

spring SockJsUrlInfo 源码

spring Transport 源码

spring TransportRequest 源码

spring UndertowXhrTransport 源码

0  赞