集群容错
- Invoker: Provider 的一个可调用 Service 的抽象,Invoker 封装了 Provider 地址及 Service 接口信息
- Directory: 代表多个 Invoker,可以把它看成 List
,但与 List 不同的是,它的值可能是动态变化的,比如注册中心推送变更 - Cluster: 将 Directory 中的多个 Invoker 伪装成一个 Invoker,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个
- Router:负责从多个 Invoker 中按路由规则选出子集,比如读写分离,应用隔离等
- LoadBalance:负责从多个 Invoker 中选出具体的一个用于本次调用,选的过程包含了负载均衡算法,调用失败后,需要重选
Mock机制
可以通过服务降级功能临时屏蔽某个出错的非关键服务,并定义降级后的返回策略。
向注册中心写入动态配置覆盖规则:
1 | RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension(); |
- mock=force:return+null 表示消费方对该服务的方法调用都直接返回 null 值,不发起远程调用。用来屏蔽不重要服务不可用时对调用方的影响。
- mock=fail:return+null 表示消费方对该服务的方法调用在失败后,再返回 null 值,不抛异常。用来容忍不重要服务不稳定时对调用方的影响。
1 | <dubbo:service interface="com.foo.BarService" mock="true" /> |
本地伪装
通常用于服务降级,比如某验权服务,当服务提供方全部挂掉后,客户端不抛出异常,而是通过 Mock 数据返回授权失败
1 | <dubbo:service interface="com.foo.BarService" mock="com.foo.BarServiceMock" /> |
Cluster->MockClusterWrapper
调用端mock的执行主要由MockClusterInvoker完成,MockClusterInvoker的invoke方法的执行逻辑如下
- (1):如果在没有配置之中没有设置mock,那么直接把方法调用转发给实际的Invoker(也就是FailoverClusterInvoker)
- (2):如果配置了强制执行Mock,比如发生服务降级,那么直接按照配置执行mock之后返回
(3):如果是其它的情况,比如只是配置的是mock=fail:return null,那么就是在正常的调用出现异常的时候按照配置执行mock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
if (value.length() == 0 || value.equalsIgnoreCase("false")){
//no mock
result = this.invoker.invoke(invocation);
} else if (value.startsWith("force")) {
if (logger.isWarnEnabled()) {
logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
}
//force:direct mock
result = doMockInvoke(invocation, null);
} else {
//fail-mock
try {
result = this.invoker.invoke(invocation);
}catch (RpcException e) {
if (e.isBiz()) {
throw e;
} else {
if (logger.isWarnEnabled()) {
logger.info("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
}
result = doMockInvoke(invocation, e);
}
}
}(4):通过(2)和(3)最终都会通过调用doMockInvoke来完成mock调用
doMockInvoke方法会首先尝试调用selectMockInvoker方法来看看用户有没有配置过MockInvoker1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23private Result doMockInvoke(Invocation invocation,RpcException e){
Result result = null;
Invoker<T> minvoker ;
List<Invoker<T>> mockInvokers = selectMockInvoker(invocation);
if (mockInvokers == null || mockInvokers.size() == 0){
minvoker = (Invoker<T>) new MockInvoker(directory.getUrl());
} else {
minvoker = mockInvokers.get(0);
}
try {
result = minvoker.invoke(invocation);
} catch (RpcException me) {
if (me.isBiz()) {
result = new RpcResult(me.getCause());
} else {
throw new RpcException(me.getCode(), getMockExceptionMessage(e, me), me.getCause());
}
} catch (Throwable me) {
throw new RpcException(getMockExceptionMessage(e, me), me.getCause());
}
return result;
}selectMockInvoker的代码如下依靠在Invocation的attachment里面做个标记来告诉directory的list方法应该返回MockInvoker
1
2
3
4
5
6
7
8
9
10
11private List<Invoker<T>> selectMockInvoker(Invocation invocation){
if (invocation instanceof RpcInvocation){
//存在隐含契约(虽然在接口声明中增加描述,但扩展性会存在问题.同时放在attachement中的做法需要改进
((RpcInvocation)invocation).setAttachment(Constants.INVOCATION_NEED_MOCK, Boolean.TRUE.toString());
//directory根据invocation中attachment是否有Constants.INVOCATION_NEED_MOCK,来判断获取的是normal invokers or mock invokers
List<Invoker<T>> invokers = directory.list(invocation);
return invokers;
} else {
return null ;
}
}directory是RegistryDirectory,RegistryDirectory的list方法里面与mock有关的部分主要是router
selectMockInvoker一般返回的都是空,所以要用MockInvoker构造的进行invoke调用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51public Result invoke(Invocation invocation) throws RpcException {
String mock = getUrl().getParameter(invocation.getMethodName()+"."+Constants.MOCK_KEY);
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(this);
}
if (StringUtils.isBlank(mock)){
mock = getUrl().getParameter(Constants.MOCK_KEY);
}
if (StringUtils.isBlank(mock)){
throw new RpcException(new IllegalAccessException("mock can not be null. url :" + url));
}
mock = normallizeMock(URL.decode(mock));
if (Constants.RETURN_PREFIX.trim().equalsIgnoreCase(mock.trim())){
//如果没有mock=return关键字
RpcResult result = new RpcResult();
result.setValue(null);
return result;
} else if (mock.startsWith(Constants.RETURN_PREFIX)) {
//mock关键字中有return则返回具体的return value
mock = mock.substring(Constants.RETURN_PREFIX.length()).trim();
mock = mock.replace('`', '"');
try {
Type[] returnTypes = RpcUtils.getReturnTypes(invocation);
Object value = parseMockValue(mock, returnTypes);
return new RpcResult(value);
} catch (Exception ew) {
throw new RpcException("mock return invoke error. method :" + invocation.getMethodName() + ", mock:" + mock + ", url: "+ url , ew);
}
} else if (mock.startsWith(Constants.THROW_PREFIX)) {
//mock关键字中有throw则抛异常
mock = mock.substring(Constants.THROW_PREFIX.length()).trim();
mock = mock.replace('`', '"');
if (StringUtils.isBlank(mock)){
throw new RpcException(" mocked exception for Service degradation. ");
} else { //用户自定义类
Throwable t = getThrowable(mock);
throw new RpcException(RpcException.BIZ_EXCEPTION, t);
}
} else { //impl mock
//如果是一个本地伪装实现类则调用这个类实现的对象
try {
Invoker<T> invoker = getInvoker(mock);
return invoker.invoke(invocation);
} catch (Throwable t) {
throw new RpcException("Failed to create mock implemention class " + mock , t);
}
}
}
Router->MockInvokersRouter
由上面的directory#list
函数过来route方法
RegistryDirectory在初始化内部的routers的时候,会人为的加上一个MockInvokerRouter
setRouters
增加MockInvokersRouter
1 | AbstractDirectory#setRouters |
AbstractDirectory#list
RegistryDirectory的list方法最后由router来对invokers进行处理
1 | public <T> List<Invoker<T>> route(final List<Invoker<T>> invokers,URL url, final Invocation invocation) throws RpcException{ |
getMockInvokers
一般的情况下,是不会配置mock协议的,所以这个方法返回null
1 | private <T> List<Invoker<T>> getMockInvokers(final List<Invoker<T>> invokers) { |
Directory
Directory分RegistryDirectory
和StaticDirectory
RegistryDirectory
notify方法
为NotifyListener
中的注册中心的回调能根据注册中心动态变化的根源所在
doList 服务读操作
1 | public List<Invoker<T>> doList(Invocation invocation) { |
引入来源this.methodInvokerMap
notify 服务写操作
注册中心有变化,则notify监听被执行
更新methodInvokerMap和urlInvokerMap的值1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19notify
|
|
|
refreshInvoker
|
|
|
toInvokers拿newUrlInvokerMap
|
|
|
toMethodInvokers拿newMethodInvokerMap
|
|
|
变更成员变量
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;
StaticDirectory
StaticDirectory的Invoker是通过构造函数传入不支持对注册中心的服务引用
Cluster
集群模式配置,按照以下示例在服务提供方和消费方配置集群模式
1 | <dubbo:service cluster="failsafe" /> |
MockClusterWrapper
本地伪装通常用于服务降级
比如某验权服务,当服务提供方全部挂掉后,客户端不抛出异常,而是通过 Mock 数据返回授权失败
由于是Wrapper每次产生的Cluster后都会由这个类包裹
1 | <dubbo:service interface="com.foo.BarService" mock="true" /> |
1 | <dubbo:service interface="com.foo.BarService" mock="return null" /> |
Mergeable Cluster
分组聚合:按组合并返回结果
比如菜单服务接口一样,但有多种实现用group区分,现在消费方需从每种group中调用一次返回结果,合并结果返回,这样就可以实现聚合菜单项。
RegistryProtocol#refer
中有group的判断,如果有Cluster
则为MergeableCluster
1 | public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { |
Failover Cluster
失败自动切换,当出现失败,重试其它服务器 1。通常用于读操作,但重试会带来更长延迟。可通过 retries=”2” 来设置重试次数(不含第一次)。
重试次数配置如下:1
2
3
4
5
6<dubbo:service retries="2" />
<dubbo:reference retries="2" />
<dubbo:reference>
<dubbo:method name="findFoo" retries="2" />
</dubbo:reference>
Failfast Cluster
快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。
1 | Invoker<T> invoker = select(loadbalance, invocation, invokers, null); |
代码如上,遇到调用失败进入catch直接快速失败
Failsafe Cluster
失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。
1 | try { |
Failback Cluster
失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。
failback-cluster-timer
线程池用于定时重发
1 | //启动一个failback-cluster-timer线程池 |
Forking Cluster
并行调用多个服务器,只要一个成功即返回。
通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks=”2” 来设置最大并行数。
forking-cluster-timer
线程池用于并发,forks
调用默认为2(invokers的length长度为2)
1 | private final ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("forking-cluster-timer", true)); |
Broadcast Cluster
广播调用所有提供者,逐个调用,任意一台报错则报错。
通常用于通知所有提供者更新缓存或日志等本地资源信息。
1 | for (Invoker<T> invoker: invokers) { |
Route
有三个实现类,分别是ConditionRouter,MockInvokersSelector,ScriptRouter
route两处使用
- 第一处为notify写入invoker的时候利用了ConditionRoute进行一个条件筛选
- 第二处为invoker.invoke()执行调用的时候,list读取invoker的时候利用MockInvokersSelector进行一个mock包装
MockInvokersSelector
list:调用的时候读取invoker
1 | router.getUrl() == null 选定MockInvokersSelector |
- MockedInvokers:url中带有mock的提供者
- NormalInvokers:url中带没有mock的提供者
ConditionRouter(条件路由)
条件路由主要就是根据dubbo管理控制台配置的路由规则来过滤相关的invoker
当我们对路由规则点击启用的时候,就会触发RegistryDirectory类的notify
方法中
notify:写入invoker的时候会引入ConditionRouter进行
1 | router.getUrl() != null判断过滤掉MockInvokersSelector |
1 |
|
Loadbalance
RoundRobin LoadBalance
随机,按权重设置随机概率。
在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。
不同权重值
1 | 服务A,B,C各个权重值: 1, 2, 3 |
相同权重值
1 | 服务A,B,C各个权重值: 1, 3, 3 |
Random LoadBalance
轮循,按公约后的权重设置轮循比率。
存在慢的提供者累积请求的问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。
不同权重值
1 | 服务A,B,C各个权重值为 1, 2, 3 |
相同权重值
1 | 服务A,B,C各个权重值为1, 3 ,3 |
LeastActive LoadBalance
最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。
使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大。
每个服务维护一个活跃数计数器。当A机器开始处理请求,该计数器加1,此时A还未处理完成。若处理完毕则计数器减1。而B机器接受到请求后很快处理完毕。那么A,B的活跃数分别是1,0。当又产生了一个新的请求,则选择B机器去执行(B活跃数最小),这样使慢的机器A收到少的请求。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 服务A,B,C各个权重值为 1, 2, 3
1:遍历invoker从RpcStatus获取active活跃数以及当前invoker的权重weight
2:遍历判断,如遇到active比之前记录的leastActive(最小活跃数)还小
则刷新leastActive的值还有leastCount最小活跃数invoker的数量以及leastIndexs相关的invokers里面的下标
以及把当前的权重成为totalWeight
3:遍历判断,如遇到跟之前leastActive一致的则直接添加leastIndexs和leastActive递增
并且叠加totalWeight,以及表示是否还有相同权重的情况sameWeight
4:如果没有遇到相同活跃数的直接调用
5:如果遇到相同活跃数但是权重不等的
random.nextInt(totalWeight) 随机值,然后以随机轮询的方法处理
即随机值 -= 每个invoker的权重weight 如果小于 0 即刻调用
6:如果遇到相同活跃数与相同权重值
invokers.get(leastIndexs[random.nextInt(leastCount)])
在活跃数列表中长度随机调用
ConsistentHash LoadBalance
一致性 Hash,相同参数的请求总是发到同一提供者。
当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。
算法参见:http://en.wikipedia.org/wiki/Consistent_hashing
缺省只对第一个参数 Hash,如果要修改,请配置1
<dubbo:parameter key="hash.arguments" value="0,1" />
缺省用 160 份虚拟节点,如果要修改,请配置1
<dubbo:parameter key="hash.nodes" value="320" />
SPI扩展-ExtensionLoader
变量说明
实例变量根据你的服务接口变化
类变量整个JVM共享
变量名 | 类型 | 功能 | 示例 |
---|---|---|---|
EXTENSION_LOADERS | 类变量 | 各个接口下对象加载器实例 | key为接口Class,value为ExtensionLoader |
EXTENSION_INSTANCES | 类变量 | 各个接口实现类 | key为实现类的Class,value为Object实现对象 |
cachedClasses | 实例变量 | 缓存从META-INF拿到的所有实现类 | 一个hodler,value为Map,Map[“name”->Class] |
cachedNames | 实例变量 | 缓存从META-INF拿到的所有实现类 | 一个ConcurrentMap,key为Class,value为name |
cachedInstances | 实例变量 | holder缓存,holder的value | key为name,value为hodler,hodler的value为目标对象 |
cachedAdaptiveInstance | 实例变量 | holder value缓存XX@Adaptive 相关对象实例 |
Holder[value=Xxx@Adaptive] |
cachedActivates | 实例变量 | 缓存activate相关的信息 | 一个Map,key为META-INF的name,value为@Activate 注解到Class上的注解信息 |
功能说明
- getActivateExtension:根据注解条件结合Key获取当前扩展可自动激活的实现
例如@Activate(group = Constants.PROVIDER),只给生产者端
URL中又指定key,@Activate(value=”限定key的value值”) - getExtension:直接根据名称获取当前扩展的指定实现
- getAdaptiveExtension : 所有类都会变成XX@Adaptive,根据获取URL的参数自适应拿对象,未指定取@SPI的name
获取实例-操作流程
1 | //Adaptive |
每个接口都会实例化一个ExtensionLoader
,为这个接口提供附属的关联对象类
如果之前有实例化过会缓存到EXTENSION_LOADERS
,后期直接从这个变量拿
首次实例化某个接口的ExtensionLoader
,附带会实例化ExtensionFactory
的加载器1
2
3
4
5private ExtensionLoader(Class<?> type) {
this.type = type;
objectFactory = (type == ExtensionFactory.class ? null :
ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
}
当实例化ExtensionFactory
的时候,代码type == ExtensionFactory.class
会生效直接返回null
接下来会先走ExtensionFactory
的getAdaptiveExtension
生成AdaptiveExtensionFactory
然后走你相关的类加载器的getAdaptiveExtension
生成Xxxx接口@Adaptive
具体的getAdaptiveExtension
代码会有解释
Adaptive
面向用户操作
1 | ExtensionLoader.getExtensionLoader(你的接口.class).getAdaptiveExtension(); |
根据你的@Adaptive({"key"})
,在构建URL中的map参数中你注解指定的key
的value
,来寻找META-INF
对应的name
的实现类
示例
1 | "man") ( |
Activate
面向用户操作
1 | List<Animal> animals = ExtensionLoader.getExtensionLoader(Animal.class).getActivateExtension( |
key
指定key后map中用你指定的key添加value,这个value后期用作@Activate(value="你之前map中指定的value")
group
可消费者可生产者
示例
1 |
|
Extension
直接指定META-INF中的name对应实现类,进行关联和获取扩展实现类1
ExtensionLoader.getExtensionLoader(你的接口.class).getExtension("你的name");
示例
1 | public class Cat implements Animal{ |
加载流程
取相关实现类
Extension
getExtension
1 | public T getExtension(String name) { |
createExtension
从getExtensionClasses
中的cachedClasses
拿取相关的缓存实现类
拿到Class后去EXTENSION_INSTANCES
取这个全局变量内是否有此Class的对象
有则返回,无则clazz.newInstance()
实例化
查询此接口是否有wrapperClasses
wrapperClasses
:即构造函数是否为本接口的Class,有则为wrapperClasses
如果有wrapperClasses
相关的类,则将上一轮实例化后的对象再放入此构造器被wrapperClass
包裹
最后进行一次injectExtension
进行处理
injectExtension
:查是否有set函数,有且只有一个参数,且public修饰符则进行自动注入
1 | private T createExtension(String name) { |
injectExtension
判断是否有set函数且参数为1个且为public
如果有则进行自动注入
1 | private T injectExtension(T instance) { |
AdaptiveExtension
getAdaptiveExtension
从cachedAdaptiveInstance
拿,如不存在则创建,cachedAdaptiveInstance
为实例变量不共享,根据你的接口而变
1 | public T getAdaptiveExtension() { |
createAdaptiveExtension
进行创建工作,获取相关类,执行实例化
- getAdaptiveExtensionClass()
获取相关的
Xxx@Adaptive
对象的类 - newInstance()
进行实例化操作
1 | private T createAdaptiveExtension() { |
getAdaptiveExtensionClass
获取相关Adaptive类信息,如果不存在则由此去创建
不存在判断:META-INF关联的类Class级别注解没有@Adaptive
1
2
3
4
5
6
7 private Class<?> getAdaptiveExtensionClass() {
getExtensionClasses();
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}
createAdaptiveExtensionClass
由此将创建
Xxx@Adaptive
对象信息
寻找合适的ClassLoader
利用compiler为JavassistCompiler去执行编译加载内存
1
2
3
4
5
6 private Class<?> createAdaptiveExtensionClass() {
String code = createAdaptiveExtensionClassCode();
ClassLoader classLoader = findClassLoader();
com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
return compiler.compile(code, classLoader);
}
createAdaptiveExtensionClassCode
主要去拼装相关的Class字节信息
它主要找有没有@Adaptive
注解修饰的方法,如果有将会修饰成如下的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219 public 返回类型 方法名(com.alibaba.dubbo.common.URL arg0) {
if (arg0 == null)
throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg0;
String extName = ( url.getProtocol() == null ? "你设定的@SPI默认值" : url.getProtocol() );
if(extName == null)
throw new IllegalStateException("Fail to get extension(你的接口) name from url(" + url.toString() + ") use keys([protocol])");
你的接口 extension = (你的接口)ExtensionLoader.getExtensionLoader(你的接口).getExtension(extName);
return extension.方法名(arg0);
}
//以下是源代码
private String createAdaptiveExtensionClassCode() {
StringBuilder codeBuidler = new StringBuilder();
Method[] methods = type.getMethods();
boolean hasAdaptiveAnnotation = false;
for(Method m : methods) {
if(m.isAnnotationPresent(Adaptive.class)) {
hasAdaptiveAnnotation = true;
break;
}
}
// 完全没有Adaptive方法,则不需要生成Adaptive类
if(! hasAdaptiveAnnotation)
throw new IllegalStateException("No adaptive method on extension " + type.getName() + ", refuse to create the adaptive class!");
codeBuidler.append("package " + type.getPackage().getName() + ";");
codeBuidler.append("\nimport " + ExtensionLoader.class.getName() + ";");
codeBuidler.append("\npublic class " + type.getSimpleName() + "$Adpative" + " implements " + type.getCanonicalName() + " {");
for (Method method : methods) {
Class<?> rt = method.getReturnType();
Class<?>[] pts = method.getParameterTypes();
Class<?>[] ets = method.getExceptionTypes();
Adaptive adaptiveAnnotation = method.getAnnotation(Adaptive.class);
StringBuilder code = new StringBuilder(512);
if (adaptiveAnnotation == null) {
code.append("throw new UnsupportedOperationException(\"method ")
.append(method.toString()).append(" of interface ")
.append(type.getName()).append(" is not adaptive method!\");");
} else {
int urlTypeIndex = -1;
for (int i = 0; i < pts.length; ++i) {
if (pts[i].equals(URL.class)) {
urlTypeIndex = i;
break;
}
}
// 有类型为URL的参数
if (urlTypeIndex != -1) {
// Null Point check
String s = String.format("\nif (arg%d == null) throw new IllegalArgumentException(\"url == null\");",
urlTypeIndex);
code.append(s);
s = String.format("\n%s url = arg%d;", URL.class.getName(), urlTypeIndex);
code.append(s);
}
// 参数没有URL类型
else {
String attribMethod = null;
// 找到参数的URL属性
LBL_PTS:
for (int i = 0; i < pts.length; ++i) {
Method[] ms = pts[i].getMethods();
for (Method m : ms) {
String name = m.getName();
if ((name.startsWith("get") || name.length() > 3)
&& Modifier.isPublic(m.getModifiers())
&& !Modifier.isStatic(m.getModifiers())
&& m.getParameterTypes().length == 0
&& m.getReturnType() == URL.class) {
urlTypeIndex = i;
attribMethod = name;
break LBL_PTS;
}
}
}
if(attribMethod == null) {
throw new IllegalStateException("fail to create adative class for interface " + type.getName()
+ ": not found url parameter or url attribute in parameters of method " + method.getName());
}
// Null point check
String s = String.format("\nif (arg%d == null) throw new IllegalArgumentException(\"%s argument == null\");",
urlTypeIndex, pts[urlTypeIndex].getName());
code.append(s);
s = String.format("\nif (arg%d.%s() == null) throw new IllegalArgumentException(\"%s argument %s() == null\");",
urlTypeIndex, attribMethod, pts[urlTypeIndex].getName(), attribMethod);
code.append(s);
s = String.format("%s url = arg%d.%s();",URL.class.getName(), urlTypeIndex, attribMethod);
code.append(s);
}
String[] value = adaptiveAnnotation.value();
// 没有设置Key,则使用“扩展点接口名的点分隔 作为Key
if(value.length == 0) {
char[] charArray = type.getSimpleName().toCharArray();
StringBuilder sb = new StringBuilder(128);
for (int i = 0; i < charArray.length; i++) {
if(Character.isUpperCase(charArray[i])) {
if(i != 0) {
sb.append(".");
}
sb.append(Character.toLowerCase(charArray[i]));
}
else {
sb.append(charArray[i]);
}
}
value = new String[] {sb.toString()};
}
boolean hasInvocation = false;
for (int i = 0; i < pts.length; ++i) {
if (pts[i].getName().equals("com.alibaba.dubbo.rpc.Invocation")) {
// Null Point check
String s = String.format("\nif (arg%d == null) throw new IllegalArgumentException(\"invocation == null\");", i);
code.append(s);
s = String.format("\nString methodName = arg%d.getMethodName();", i);
code.append(s);
hasInvocation = true;
break;
}
}
String defaultExtName = cachedDefaultName;
String getNameCode = null;
for (int i = value.length - 1; i >= 0; --i) {
if(i == value.length - 1) {
if(null != defaultExtName) {
if(!"protocol".equals(value[i]))
if (hasInvocation)
getNameCode = String.format("url.getMethodParameter(methodName, \"%s\", \"%s\")", value[i], defaultExtName);
else
getNameCode = String.format("url.getParameter(\"%s\", \"%s\")", value[i], defaultExtName);
else
getNameCode = String.format("( url.getProtocol() == null ? \"%s\" : url.getProtocol() )", defaultExtName);
}
else {
if(!"protocol".equals(value[i]))
if (hasInvocation)
getNameCode = String.format("url.getMethodParameter(methodName, \"%s\", \"%s\")", value[i], defaultExtName);
else
getNameCode = String.format("url.getParameter(\"%s\")", value[i]);
else
getNameCode = "url.getProtocol()";
}
}
else {
if(!"protocol".equals(value[i]))
if (hasInvocation)
getNameCode = String.format("url.getMethodParameter(methodName, \"%s\", \"%s\")", value[i], defaultExtName);
else
getNameCode = String.format("url.getParameter(\"%s\", %s)", value[i], getNameCode);
else
getNameCode = String.format("url.getProtocol() == null ? (%s) : url.getProtocol()", getNameCode);
}
}
code.append("\nString extName = ").append(getNameCode).append(";");
// check extName == null?
String s = String.format("\nif(extName == null) " +
"throw new IllegalStateException(\"Fail to get extension(%s) name from url(\" + url.toString() + \") use keys(%s)\");",
type.getName(), Arrays.toString(value));
code.append(s);
s = String.format("\n%s extension = (%<s)%s.getExtensionLoader(%s.class).getExtension(extName);",
type.getName(), ExtensionLoader.class.getSimpleName(), type.getName());
code.append(s);
// return statement
if (!rt.equals(void.class)) {
code.append("\nreturn ");
}
s = String.format("extension.%s(", method.getName());
code.append(s);
for (int i = 0; i < pts.length; i++) {
if (i != 0)
code.append(", ");
code.append("arg").append(i);
}
code.append(");");
}
codeBuidler.append("\npublic " + rt.getCanonicalName() + " " + method.getName() + "(");
for (int i = 0; i < pts.length; i ++) {
if (i > 0) {
codeBuidler.append(", ");
}
codeBuidler.append(pts[i].getCanonicalName());
codeBuidler.append(" ");
codeBuidler.append("arg" + i);
}
codeBuidler.append(")");
if (ets.length > 0) {
codeBuidler.append(" throws ");
for (int i = 0; i < ets.length; i ++) {
if (i > 0) {
codeBuidler.append(", ");
}
codeBuidler.append(pts[i].getCanonicalName());
}
}
codeBuidler.append(" {");
codeBuidler.append(code.toString());
codeBuidler.append("\n}");
}
codeBuidler.append("\n}");
if (logger.isDebugEnabled()) {
logger.debug(codeBuidler.toString());
}
return codeBuidler.toString();
}
ActivateExtension
getActivateExtension
根据传进来的参数,拿取url
中指定的key
对应的value
,如果value
为空直接返回null
所以使用ActivateExtension
必须要指定key
和value
1
2
3
4public List<T> getActivateExtension(URL url, String key, String group) {
String value = url.getParameter(key);
return getActivateExtension(url, value == null || value.length() == 0 ? null : Constants.COMMA_SPLIT_PATTERN.split(value), group);
}
拿到value
后进入以下方法,并附带了url
与group
主要做了以下步骤
getExtensionClasses:获取当前加载器相关的Class信息,并且领出
cachedActivates
信息,cachedActivates
为当前加载器下的满足以下条件就会加入1
2
3
4Activate activate = clazz.getAnnotation(Activate.class);
if (activate != null) {
cachedActivates.put(names[0], activate);
}isMatchGroup:匹配group组是否符合,即consumer或provider,符合组即
getExtension(name);
取出对象信息!names.contains(name) && isActive(….): 判断key是否相符,相符则加入到
exts
,如果未指定注解value
也被加入进去META-INF中的name如果带有”-“标识则表现出为移除状态
! names.contains(name)
先排除掉让没有key指定的对象先添加到list再排序list
后期再加T ext = getExtension(name);
对ext对象直接添加到exts
列表,保证了调用的先后顺序
1 | public List<T> getActivateExtension(URL url, String[] values, String group) { |
加载META-INF接口相关实现类
getExtensionClasses
如果当前缓存类持有容器不存在相关的Map(name->实现类),则进行加载流程
1 | private Map<String, Class<?>> getExtensionClasses() { |
loadExtensionClasses
组织加载流程三个目录
从
dir
目标目录中加载接口-实现类配置文件
dir
目录有这三类:
- SERVICES_DIRECTORY = “META-INF/services/“
- DUBBO_DIRECTORY = “META-INF/dubbo/“
- DUBBO_INTERNAL_DIRECTORY = DUBBO_DIRECTORY + “internal/“
这些目录的文件样式如下:1
monitor=com.alibaba.dubbo.monitor.support.MonitorFilter
1 | private Map<String, Class<?>> loadExtensionClasses() { |
loadFile
extensionClasses
为成员变量cachedClasses
以下是主要加载文件的源代码
1 | private void loadFile(Map<String, Class<?>> extensionClasses, String dir) { |
以上代码经历了以下主要步骤
从主要三个目录加载文件
1 | SERVICES_DIRECTORY = "META-INF/services/" |
分解name,line两部分
1 | while 循环每一行 |
设定加cachedAdaptiveClass
1 | clazz.isAnnotationPresent(Adaptive.class) |
添加cachedActivates
1 | Activate activate = clazz.getAnnotation(Activate.class); |
添加cachedNames,extensionClasses
1 | //最后将类放入cachedNames,extensionClasses |
服务-Consumer,Producer
消费者
消费者-服务引用
流程序列图
大致流程
ReferenceBean
实现FactoryBean
的getObject
方法进入由
DubboNamespaceHandler
解析registerBeanDefinitionParser
,从而带入ReferenceBean
到Spring环境中
所有的消费者都将会在身为FactoryBean
的ReferenceBean
的getObject
方法拿取引用服务的Beanrefprotocol[Protocol$Adaptive].refer
通过url为registry
关键字找RegistryProtocol#refer
RegistryProtocol#refer
首先做注册registry.register
RegistryProtocol#refer
接着做注册中心订阅directory.subscribe
RegistryDirectory
往zk进行操作zookeeperRegistry.subscribe
,并注册自身的NotifyListener#notify
方法RegistryDirectory
在zk得到响应notify
方法被调用接着引出refreshInvoker
整理从注册中心得到的服务Invoker列表refreshInvoker
主要操作toInvokers
与toMethodInvokers
两个相关的方法来换取相关的InvokerstoInvokers
:1
invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
由上方代码引出
DubboProtocol
toMethodInvokers
:1
newMethodInvokerMap.put(method, route(methodInvokers, method));
由上方代码引出
route
机制
DubboProtocol#refer
主要构件DubboInvoker并且建立客户端链接getClients(url)
,并添加invokers
1
2DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);FailoverCluster
在RegistryDirectory
结束notify
之后会得到根据route
规则通过directory.list
拿到的Invokers列表,然后用FailoverClusterInvoker
包裹JavassistProxyFactory
将上一步得到的ClusterInvoker
到这里来进行getProxy
代理操作
消费者-调用
流程序列图
大致流程
InvokerInvocationHandler
由于所有的引用服务都被JavassistProxyFactory.getProxy
所以的Invoker
调用都被这个InvokerInvocationHandler
包裹MockClusterInvoker
因为有MockClusterWrapper
所以最后在FailoverCluster
上包裹了一层主要用于判断:有无mock,强制mock,失败后mock三种失败mock策略
FailoverClusterInvoker
在父级AbstractClusterInvoker
选定好后loadbalance
策略,到了本对象的doInvoke
直接激活负载策略,如遇失败重试,具体重试次数根据设定来主要用于各类容错机制
InvokerWrapper
因为在InvokerDelegete
的初始化,作为父级类包裹了invokerProtocolFilterWrapper
由于DubboProtocol
要经过Filter
责任链的洗礼,所以这里的Invoker
全部是Consumer
端的Filter
通过责任链可以添加各类扩展
ListenerExporterWrapper
由于ProtocolListenerWrapper
也是DubboProtocol
包裹类,在refer
操作中被强制包上了ListenerInvokerWrapper
实现
InvokerListener
进行invoker监听动作DubboInvoker
最终来到了这里拿取client
进行远程调用
invoker 调用细节
route.route
会用在两个地方
notify:写入invoker的时候会引入ConditionRouter进行
1
2router.getUrl() != null判断过滤掉MockInvokersSelector
ConditionRouter的matchThen进行主要的invoker过滤list:调用的时候读取invoker
1
2router.getUrl() == null 选定MockInvokersSelector
参数值`invocation.need.mock`有并且`invocation.getAttachments`不为空则返回`MockedInvokers`否则返回`NormalInvokers`
生产者
生产者-服务暴露
流程序列图
大致流程
- 配置为none不暴露
- 配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
- 如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露远程服务)
本地暴露
- proxyFactory.getInvoker:
protocol.export
Protocol$Adaptive(InJvmProtocol)
两个Wrapper:ProtocolFilterWrapper
(官方9个Filter),ProtocolListenerWrapper
(包裹ListenerExportWrapper
)
在Filter包裹后,Listener包裹之前构建InJvmExport
exporters.add(exporter);
远程暴露
- proxyFactory.getInvoker
protocol.export
Protocol$Adaptive(RegistryProtocol)
- 构建
InvokeDelegate
protocol.export(invokeDelegate)
暴露服务registry.register
将OverrideListener
注册Protocol$Adaptive(DubboProtocol)
ProtocolFilterWrapper
嵌套责任链DubboExport
将Invoke转DubboExport
openServer
开启Netty网络服务return New Export(....)
返回Export
出去
- 构建
- exporters.add(exporter);
注册中心-Zookeeper
zookeeperRegistry
FailbackRegistry
作为
ZookeeperRegistry
的父类:专注用于失败后的重试机制
起DubboRegistryFailedRetryTimer
线程池用来做定时任务
doRegister : 注册中心失败
doSubscribe : 监听注册中心失败重试
doUnsubscribe : 向服务器端发送取消订阅请求失败重试
doNotify : 将失败的通知请求记录到失败列表,定时重试
网络模型-Netty
HeaderExchangeClient/HeaderExchangeServer->NettyServer/NettyClient
NettyServer/NettyClient—>MultiMessageHandler–>HeartbeatHandler—> AllChannelHandler
[HeaderExchanger构建] DecodeHandler-> HeaderExchangeHandler->NettyHandler
消费者-网络模型
DubboInvoke
获取Client判读是否是共享client还是非共享clientExchangers
工具类,获得合适的Exchanger进行connect链接,默认为HeaderExchanger
HeaderExchanger
进行RequestHandler
参数包裹HeaderExchangeHandler
包裹第一层DecodeHandler
包裹第二层Transporters.connect
起客户端链接HeaderExchangeClient
包裹第三层,并且在初始化防范中起dubbo-remoting-client-heartbeat
心跳Transporters
工具类,根据上一轮步骤Transporters.connect
起客户端链接衍生1
getTransporter() //默认获取NettyTransporter
NettyTransporter
初始化新的NettyClient
返回回去NettyClient
wrapChannelHandler
进行包裹寻找
Dispatch
,默认AllDispatcher
实现类AllChannelHandler
包裹Url以及handler(被DecodeHandler
,HeaderExchangeHandler
包裹)在
WrappedChannelHandler
初始化中起executor(DubboClientHandler
)再初始化一个dataStore存储url->executor
HeartbeatHandler
包裹MultiMessageHandler
包裹
参数功能点-URL
屏蔽功能
dubbo管控台调控
消费者 禁止 被禁止的客户端将收到禁止访问异常。
notify的时候会带上route
的url
1 | route://0.0.0.0/cn.coderss.service.user.api.UserService?category=routers&dynamic=false&enabled=true&force=true&name=cn.coderss.service.user.api.UserService blackwhitelist&priority=0&router=condition&rule=consumer.host+%3D+192.168.3.149+%3D%3E+false&runtime=false |
“rule” -> “consumer.host+=+192.168.3.149+=>+false”
1 | RegistryDirectory#notify->refreshInvoker->toMethodInvokers->newMethodInvokerMap.put(method, route(methodInvokers, method)); |
Mock服务降级&&容错机制
注意点:初始化的时候如果
mock=force:return+null
或者mock=fail:return+null
都会失效,只有mock=retur null
才能行
因为AbstractMethodConfig#setMock
有相关的mock参数检测,只允许.数字字母
dubbo管控台调控
1 | zookeeper注册中心变更 |
导致变更以下代码
1 | //directoryUrl 与 override 合并是在notify的最后,这里不能够处理 |
调用的时候MockClusterInvoker#invoke
1 | //获取`overrideDirectoryUrl`相关的mock参数 |
mock=force:return+null
当强制返回的时候或者fail:return+null
的情况下
首先都会去RegistryDirectory#list
找是否提供相关的mock协议而不是dubbo协议的服务提供者
如果有则调用这些invokers
否则自己去组装一个mockInvoker
1 | MockInvoker#invoke |
初始化服务调控
1 | 600000, mock = "return null") (timeout = |
线程模型
业务线程模型
Dispatcher
all 除发送消息之外,其他全部使用线程池处理;包括请求,响应,连接事件,断开事件,心跳等。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43public void connected(Channel channel) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try{
cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CONNECTED));
}catch (Throwable t) {
throw new ExecutionException("connect event", channel, getClass()+" error when process connected event ." , t);
}
}
public void disconnected(Channel channel) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try{
cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.DISCONNECTED));
}catch (Throwable t) {
throw new ExecutionException("disconnect event", channel, getClass()+" error when process disconnected event ." , t);
}
}
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
public void caught(Channel channel, Throwable exception) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try{
cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CAUGHT, exception));
}catch (Throwable t) {
throw new ExecutionException("caught event", channel, getClass()+" error when process caught event ." , t);
}
}
private ExecutorService getExecutorService() {
ExecutorService cexecutor = executor;
if (cexecutor == null || cexecutor.isShutdown()) {
cexecutor = SHARED_EXECUTOR;
}
return cexecutor;
}execution 除发送消息之外,其他全部使用线程池处理;
与AllChannelHandler不同之处在于若创建的线程池ExecutorService不可用,AllChannelHandler将使用共享线程池,而ExecutionChannelHandler只有报错了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15public void connected(Channel channel) throws RemotingException {
executor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CONNECTED));
}
public void disconnected(Channel channel) throws RemotingException {
executor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.DISCONNECTED));
}
public void received(Channel channel, Object message) throws RemotingException {
executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
}
public void caught(Channel channel, Throwable exception) throws RemotingException {
executor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CAUGHT, exception));
}direct 所有消息都不派发到线程池,全部在 IO 线程上直接执行。
1
2
3public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return handler;
}message 只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在 IO 线程上执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16public ChannelHandler dispatch(ChannelHandler handler, URL url) {
return new MessageOnlyChannelHandler(handler, url);
}
......
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService cexecutor = executor;
if (cexecutor == null || cexecutor.isShutdown()) {
cexecutor = SHARED_EXECUTOR;
}
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}connection 在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
super(handler, url);
String threadName = url.getParameter(Constants.THREAD_NAME_KEY,Constants.DEFAULT_THREAD_NAME);
connectionExecutor = new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(Constants.CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
new NamedThreadFactory(threadName, true),
new AbortPolicyWithReport(threadName, url)
); // FIXME 没有地方释放connectionExecutor!
queuewarninglimit = url.getParameter(Constants.CONNECT_QUEUE_WARNING_SIZE, Constants.DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
}
public void connected(Channel channel) throws RemotingException {
try{
checkQueueLength();
connectionExecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CONNECTED));
}catch (Throwable t) {
throw new ExecutionException("connect event", channel, getClass()+" error when process connected event ." , t);
}
}
public void disconnected(Channel channel) throws RemotingException {
try{
checkQueueLength();
connectionExecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.DISCONNECTED));
}catch (Throwable t) {
throw new ExecutionException("disconnected event", channel, getClass()+" error when process disconnected event ." , t);
}
}
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService cexecutor = executor;
if (cexecutor == null || cexecutor.isShutdown()) {
cexecutor = SHARED_EXECUTOR;
}
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
public void caught(Channel channel, Throwable exception) throws RemotingException {
ExecutorService cexecutor = executor;
if (cexecutor == null || cexecutor.isShutdown()) {
cexecutor = SHARED_EXECUTOR;
}
try{
cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CAUGHT, exception));
}catch (Throwable t) {
throw new ExecutionException("caught event", channel, getClass()+" error when process caught event ." , t);
}
}
private void checkQueueLength(){
if (connectionExecutor.getQueue().size() > queuewarninglimit){
logger.warn(new IllegalThreadStateException("connectionordered channel handler `queue size: "+connectionExecutor.getQueue().size()+" exceed the warning limit number :"+queuewarninglimit));
}
}
ThreadPool
- fixed 固定大小线程池,启动时建立线程,不关闭,一直持有。(缺省)
- cached 缓存线程池,空闲一分钟自动删除,需要时重建。
- limited 可伸缩线程池,但池中的线程数只会增长不会收缩。只增长不收缩的目的是为了避免收缩时突然来了大流量引起的性能问题。
DubboRegistryFailedRetryTimer
注册中心相关 失败执行后 - 定时循环重新执行任务 - 执行器
参数retry.period
: 注册中心失败事件重试事件,即此线程的定时数,单位默认MILLISECONDS
FailbackRegistry
作为ZookeeperRegistry
的父类:专注用于失败后的重试机制
doRegister : 注册中心失败
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20//failedRegistered 数值的由来
//failedRegistered.remove(url);
//failedUnregistered.remove(url);
try {
// 向服务器端发送注册请求
doRegister(url);
} catch (Exception e) {
// 将失败的注册请求记录到失败列表,定时重试
failedRegistered.add(url);
}
//只要成功了就移除failedRegistered.remove(url);
protected void doRegister(URL url) {
try {
zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}doSubscribe : 监听注册中心失败重试
1
2
3
4
5
6
7
8
9
10failedSubscribed
//removeFailedSubscribed(url, listener);
try {
// 向服务器端发送订阅请求
doSubscribe(url, listener);
} catch (Exception e) {
// 将失败的订阅请求记录到失败列表,定时重试
addFailedSubscribed(url, listener);
}doUnsubscribe : 向服务器端发送取消订阅请求失败重试
1
2
3
4
5
6
7
8
9
10
11
12
13removeFailedSubscribed(url, listener);
try {
// 向服务器端发送取消订阅请求
doUnsubscribe(url, listener);
} catch (Exception e) {
// 将失败的取消订阅请求记录到失败列表,定时重试
Set<NotifyListener> listeners = failedUnsubscribed.get(url);
if (listeners == null) {
failedUnsubscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
listeners = failedUnsubscribed.get(url);
}
listeners.add(listener);
}doNotify : 将失败的通知请求记录到失败列表,定时重试
1
2
3
4
5
6
7
8
9
10
11
12try {
doNotify(url, listener, urls);
} catch (Exception t) {
// 将失败的通知请求记录到失败列表,定时重试
Map<NotifyListener, List<URL>> listeners = failedNotified.get(url);
if (listeners == null) {
failedNotified.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, List<URL>>());
listeners = failedNotified.get(url);
}
listeners.put(listener, urls);
logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
}
forking-cluster-timer
Cluster
选用Forking Cluster
并行调用多个服务器,只要一个成功即返回
failback-cluster-timer
Cluster
选用Failback Cluster
failback-cluster-timer
线程池用于定时重发失败的调用
DubboClientHandler
NettyClient
产生线程池ChannelHandlers#wrap
将NettyClient
相关处理业务的handler
包裹在Dispatcher
1 | <dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="100" /> |
dubbo-remoting-client-heartbeat
HeaderExchangeClient
产生此线程池,执行HeartBeatTask
heartbeat:性能调优心跳间隔,对于长连接,当物理层断开时;比如拔网线,TCP的FIN消息来不及发送,对方收不到断开事件,此时需要心跳来帮助检查连接是否已断开
DubboClientReconnectTimer
Dubbo客户端断线重连
心跳和断线重连线程池工作职责不一样
- 心跳 : 只管发send信息到各个channelProvider,如果遇到超时则进行重连,期间正常发的时候不去管是否有断开连接
- 断线重连 : 定时判断是否连接,如遇没有连接则进行重连
1 | Runnable connectStatusCheckCommand = new Runnable() { |
DubboSaveRegistryCache
AbstractRegistry
产生,主要用于配置文件缓存定时写入
每次注册中心notify
的时候会激发saveProperties(url);
->registryCacheExecutor.execute(new SaveProperties(version));
函数
用于写入invoker的时候写入本地缓存
1 | registryCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet())); |
DubboServerHandler
NettyServer
产生线程池ChannelHandlers#wrap
将NettyClient
相关处理业务的handler
包裹在Dispatcher
1 | <dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="100" /> |
dubbo-remoting-server-heartbeat
HeaderExchangeServer
产生线程池,执行HeartBeatTask
handler包裹
ExchangerChannelHandler与ExchangeChannelHandler不一样
一个用来处理响应并回复,一个用来作为NettyClient包裹发送
服务端
1 | HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); |
客户端
1 | HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); |
国内查看评论需要代理~