dubbo NettyClient 源码
dubbo NettyClient 代码
文件路径:/dubbo-remoting/dubbo-remoting-netty/src/main/java/org/apache/dubbo/remoting/transport/netty/NettyClient.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.remoting.transport.netty;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.Version;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.transport.AbstractClient;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_CLIENT_CONNECT_TIMEOUT;
import static org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_CONNECT_PROVIDER;
/**
* NettyClient.
*/
public class NettyClient extends AbstractClient {
private static final ErrorTypeAwareLogger logger = LoggerFactory.getErrorTypeAwareLogger(NettyClient.class);
// ChannelFactory's closure has a DirectMemory leak, using static to avoid
// https://issues.jboss.org/browse/NETTY-424
private static final ChannelFactory CHANNEL_FACTORY = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(new NamedThreadFactory("NettyClientBoss", true)),
Executors.newCachedThreadPool(new NamedThreadFactory("NettyClientWorker", true)),
Constants.DEFAULT_IO_THREADS);
private ClientBootstrap bootstrap;
private volatile Channel channel; // volatile, please copy reference to use
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
super(url, wrapChannelHandler(url, handler));
}
@Override
protected void doOpen() throws Throwable {
NettyHelper.setNettyLoggerFactory();
bootstrap = new ClientBootstrap(CHANNEL_FACTORY);
// config
// @see org.jboss.netty.channel.socket.SocketChannelConfig
bootstrap.setOption("keepAlive", true);
bootstrap.setOption("tcpNoDelay", true);
bootstrap.setOption("connectTimeoutMillis", getConnectTimeout());
final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline() {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("decoder", adapter.getDecoder());
pipeline.addLast("encoder", adapter.getEncoder());
pipeline.addLast("handler", nettyHandler);
return pipeline;
}
});
}
@Override
protected void doConnect() throws Throwable {
long start = System.currentTimeMillis();
ChannelFuture future = bootstrap.connect(getConnectAddress());
try {
boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS);
if (ret && future.isSuccess()) {
Channel newChannel = future.getChannel();
newChannel.setInterestOps(Channel.OP_READ_WRITE);
try {
// Close old channel
Channel oldChannel = NettyClient.this.channel; // copy reference
if (oldChannel != null) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close old netty channel " + oldChannel + " on create new netty channel " + newChannel);
}
oldChannel.close();
} finally {
NettyChannel.removeChannelIfDisconnected(oldChannel);
}
}
} finally {
if (NettyClient.this.isClosed()) {
try {
if (logger.isInfoEnabled()) {
logger.info("Close new netty channel " + newChannel + ", because the client closed.");
}
newChannel.close();
} finally {
NettyClient.this.channel = null;
NettyChannel.removeChannelIfDisconnected(newChannel);
}
} else {
NettyClient.this.channel = newChannel;
}
}
} else if (future.getCause() != null) {
Throwable cause = future.getCause();
RemotingException remotingException = new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + ", error message is:" + cause.getMessage(), cause);
// 6-1 - Failed to connect to provider server by other reason.
logger.error(TRANSPORT_FAILED_CONNECT_PROVIDER, "network disconnected", "", "Failed to connect to provider server by other reason.", cause);
throw remotingException;
} else {
RemotingException remotingException = new RemotingException(this, "client(url: " + getUrl() + ") failed to connect to server "
+ getRemoteAddress() + " client-side timeout "
+ getConnectTimeout() + "ms (elapsed: " + (System.currentTimeMillis() - start) + "ms) from netty client "
+ NetUtils.getLocalHost() + " using dubbo version " + Version.getVersion());
// 6-2 - Client-side timeout.
logger.error(TRANSPORT_CLIENT_CONNECT_TIMEOUT, "provider crash", "", "Client-side timeout.", remotingException);
throw remotingException;
}
} finally {
if (!isConnected()) {
future.cancel();
}
}
}
@Override
protected void doDisConnect() throws Throwable {
try {
NettyChannel.removeChannelIfDisconnected(channel);
} catch (Throwable t) {
logger.warn(t.getMessage());
}
}
@Override
protected void doClose() throws Throwable {
/*try {
bootstrap.releaseExternalResources();
} catch (Throwable t) {
logger.warn(t.getMessage());
}*/
}
@Override
protected org.apache.dubbo.remoting.Channel getChannel() {
Channel c = channel;
if (c == null || !c.isConnected()) {
return null;
}
return NettyChannel.getOrAddChannel(c, getUrl(), this);
}
Channel getNettyChannel() {
return channel;
}
}
相关信息
相关文章
dubbo NettyBackedChannelBuffer 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
8、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦