Dubbo路由机制分析

Dubbo服务端接收请求及响应请求原理分析

之前一篇文章《Dubbo服务提供者发布及注册过程源码分析》已经介绍了Dubbo服务端的服务注册及发布过程,这篇文章将会介绍Dubbo服务端是如何接受请求以及响应请求的。

本文还是以Consumer-Provider的Demo为例,分析接收请求及响应请求的具体流程,在Dubbo服务端发布服务之后,它将会监听一个端口等待接收客户端的请求,当接收到请求后,会经过入站处理器进行处理,我们知道在发布服务的时候设置了NettyServerHandler入站处理器,接收到请求之后,会经过NettyServerHandler#channelRead()方法来获取请求的消息,我们来看一下它的实现:

1
2
3
4
5
6
7
8
9
10
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//以ctx.channel()为key,以NettyChannel为value,存储在ConcurrentHashMap中
NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
try {
handler.received(channel, msg);
} finally {
NettyChannel.removeChannelIfDisconnected(ctx.channel());
}
}

Dubbo Gracefully Shutdown机制分析

Dubbo集群容错机制

在之前一篇文章《Dubbo消费者调用过程源码分析》讲到,在创建代理的时候会生成调用对象invoker,这个时候就会绑定集群策略,我们来看生成invoker的代码,在类ReferenceConfig#createProxy(Map<String, String> map)方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
//直接在配置文件中配置url,实现直接通信(如果既配置了直连地址又配置了注册中心的地址,则自动忽略注册中心的地址)
if (url != null && url.length() > 0) { // user specified URL, could be peer-to-peer address, or register center's address.
String[] us = Constants.SEMICOLON_SPLIT_PATTERN.split(url);
if (us != null && us.length > 0) {
for (String u : us) {
URL url = URL.valueOf(u);
if (url.getPath() == null || url.getPath().length() == 0) {
url = url.setPath(interfaceName);
}
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
urls.add(url.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
} else {
urls.add(ClusterUtils.mergeUrl(url, map));
}
}
}
} else { // assemble URL from register center's configuration
//获取配置注册中心的url(可以有多个注册中心的url)
List<URL> us = loadRegistries(false);
if (us != null && !us.isEmpty()) {
for (URL u : us) {
URL monitorUrl = loadMonitor(u);
if (monitorUrl != null) {
map.put(Constants.MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
urls.add(u.addParameterAndEncoded(Constants.REFER_KEY, StringUtils.toQueryString(map)));
}
}
//如果urls为空,则抛出异常
if (urls.isEmpty()) {
throw new IllegalStateException("No such any registry to reference " + interfaceName + " on the consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please config <dubbo:registry address=\"...\" /> to your spring config.");
}
}
//当urls的长度为一时,可能为服务的直连地址也可能为注册中心的地址
if (urls.size() == 1) {
invoker = refprotocol.refer(interfaceClass, urls.get(0));
} else {//有多个直连地址,或者多个注册中心的地址
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
//获取每个url对应的invoker
invokers.add(refprotocol.refer(interfaceClass, url));
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
//获取最后一个注册中心的地址
registryURL = url; // use last registry url
}
}
if (registryURL != null) { // registry url is available
// use AvailableCluster only when register's cluster is available
//当有注册中心的地址时,第一层使用AvailableCluster集群策略,第二层使用默认的集群策略
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));
} else { // not a registry url
//如果没有注册中心地址,则使用默认的集群策略
invoker = cluster.join(new StaticDirectory(invokers));
}
}

我们总结出三种生成集群策略的入口,当urls的长度为1时,此时该url可能为注册中心的地址也可能是服务的直连地址,则进一步执行invoker = refprotocol.refer(interfaceClass, urls.get(0));;当urls的长度不为1时,此时可能为多个注册中心的地址或者多个服务直连地址,当为多个注册中心的地址时,会执行:

1
2
URL u = registryURL.addParameter(Constants.CLUSTER_KEY, AvailableCluster.NAME);
invoker = cluster.join(new StaticDirectory(u, invokers));

当为多个直连地址时,则执行:invoker = cluster.join(new StaticDirectory(invokers));,下面将对这三种方式进行详细的分析。

Dubbo负载均衡策略分析

Dubbo提供了四种负载均衡策略:Random LoadBalance(加权随机负载均衡)、RoundRobin LoadBalance(加权轮询负载均衡)、LeastActive LoadBalance(最少活跃数负载均衡)、ConsistentHash LoadBalance(一致性hash负载均衡),下面将分别分析这四种负载均衡策略的源码。

Random LoadBalance

先来看一下RandomLoadBalance类的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class RandomLoadBalance extends AbstractLoadBalance {

public static final String NAME = "random";

@Override
protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
int length = invokers.size(); // Number of invokers
int totalWeight = 0; // The sum of weights
boolean sameWeight = true; // Every invoker has the same weight?
for (int i = 0; i < length; i++) {
//获取权重
int weight = getWeight(invokers.get(i), invocation);
//权重累积和
totalWeight += weight; // Sum
//记录所有的invokers的weight是否是一样的
if (sameWeight && i > 0
&& weight != getWeight(invokers.get(i - 1), invocation)) {
sameWeight = false;
}
}
//如果不是所有的invoker权重都一样
if (totalWeight > 0 && !sameWeight) {
// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
//获取随机数的范围是[0, totalWeight)
int offset = ThreadLocalRandom.current().nextInt(totalWeight);
// Return a invoker based on the random value.
for (int i = 0; i < length; i++) {
offset -= getWeight(invokers.get(i), invocation);
if (offset < 0) {
//返回invoker
return invokers.get(i);
}
}
}
//如果所有的invoker都是一样的weight,则直接获取随机数,并返回
//这里使用了ThreadLocalRandom做了优化
// If all invokers have the same weight value or totalWeight=0, return evenly.
return invokers.get(ThreadLocalRandom.current().nextInt(length));
}

}

随机加权轮询算法还是比较容易理解的,下面继续分析RoundRobin LoadBalance。

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×