Dubbo 源码解析

集群容错

image-20180803143006952

  • Invoker: Provider 的一个可调用 Service 的抽象,Invoker 封装了 Provider 地址及 Service 接口信息
  • Directory: 代表多个 Invoker,可以把它看成 List,但与 List 不同的是,它的值可能是动态变化的,比如注册中心推送变更
  • Cluster: 将 Directory 中的多个 Invoker 伪装成一个 Invoker,对上层透明,伪装过程包含了容错逻辑,调用失败后,重试另一个
  • Router:负责从多个 Invoker 中按路由规则选出子集,比如读写分离,应用隔离等
  • LoadBalance:负责从多个 Invoker 中选出具体的一个用于本次调用,选的过程包含了负载均衡算法,调用失败后,需要重选

Mock机制

可以通过服务降级功能临时屏蔽某个出错的非关键服务,并定义降级后的返回策略。
向注册中心写入动态配置覆盖规则:

1
2
3
RegistryFactory registryFactory = ExtensionLoader.getExtensionLoader(RegistryFactory.class).getAdaptiveExtension();
Registry registry = registryFactory.getRegistry(URL.valueOf("zookeeper://10.20.153.10:2181"));
registry.register(URL.valueOf("override://0.0.0.0/com.foo.BarService?category=configurators&dynamic=false&application=foo&mock=force:return+null"));
  • mock=force:return+null 表示消费方对该服务的方法调用都直接返回 null 值,不发起远程调用。用来屏蔽不重要服务不可用时对调用方的影响。
  • mock=fail:return+null 表示消费方对该服务的方法调用在失败后,再返回 null 值,不抛异常。用来容忍不重要服务不稳定时对调用方的影响。
1
2
<dubbo:service interface="com.foo.BarService" mock="true" />
<dubbo:service interface="com.foo.BarService" mock="return null" />

本地伪装

通常用于服务降级,比如某验权服务,当服务提供方全部挂掉后,客户端不抛出异常,而是通过 Mock 数据返回授权失败

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<dubbo:service interface="com.foo.BarService" mock="com.foo.BarServiceMock" />

package com.foo;
public class BarServiceMock implements BarService {
public String sayHello(String name) {
// 你可以伪造容错数据,此方法只在出现RpcException时被执行
return "容错数据";
}
}


MockClusterInvoker->doMockInvoke->selectMockInvoker
一般情况下selectMockInvoker都为null
所以由MockInvoker包裹最终由MockInvoker#invoker

//impl mock
//如果是一个本地伪装实现类则调用这个类实现的对象
try {
Invoker<T> invoker = getInvoker(mock);
return invoker.invoke(invocation);
} catch (Throwable t) {
throw new RpcException("Failed to create mock implemention class " + mock , t);
}

Cluster->MockClusterWrapper

调用端mock的执行主要由MockClusterInvoker完成,MockClusterInvoker的invoke方法的执行逻辑如下

  • (1):如果在没有配置之中没有设置mock,那么直接把方法调用转发给实际的Invoker(也就是FailoverClusterInvoker)
  • (2):如果配置了强制执行Mock,比如发生服务降级,那么直接按照配置执行mock之后返回
  • (3):如果是其它的情况,比如只是配置的是mock=fail:return null,那么就是在正常的调用出现异常的时候按照配置执行mock

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim(); 
    if (value.length() == 0 || value.equalsIgnoreCase("false")){
    //no mock
    result = this.invoker.invoke(invocation);
    } else if (value.startsWith("force")) {
    if (logger.isWarnEnabled()) {
    logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
    }
    //force:direct mock
    result = doMockInvoke(invocation, null);
    } else {
    //fail-mock
    try {
    result = this.invoker.invoke(invocation);
    }catch (RpcException e) {
    if (e.isBiz()) {
    throw e;
    } else {
    if (logger.isWarnEnabled()) {
    logger.info("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
    }
    result = doMockInvoke(invocation, e);
    }
    }
    }
  • (4):通过(2)和(3)最终都会通过调用doMockInvoke来完成mock调用
    doMockInvoke方法会首先尝试调用selectMockInvoker方法来看看用户有没有配置过MockInvoker

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    private Result doMockInvoke(Invocation invocation,RpcException e){
    Result result = null;
    Invoker<T> minvoker ;

    List<Invoker<T>> mockInvokers = selectMockInvoker(invocation);
    if (mockInvokers == null || mockInvokers.size() == 0){
    minvoker = (Invoker<T>) new MockInvoker(directory.getUrl());
    } else {
    minvoker = mockInvokers.get(0);
    }
    try {
    result = minvoker.invoke(invocation);
    } catch (RpcException me) {
    if (me.isBiz()) {
    result = new RpcResult(me.getCause());
    } else {
    throw new RpcException(me.getCode(), getMockExceptionMessage(e, me), me.getCause());
    }
    } catch (Throwable me) {
    throw new RpcException(getMockExceptionMessage(e, me), me.getCause());
    }
    return result;
    }
  • selectMockInvoker的代码如下依靠在Invocation的attachment里面做个标记来告诉directory的list方法应该返回MockInvoker

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    private List<Invoker<T>> selectMockInvoker(Invocation invocation){
    if (invocation instanceof RpcInvocation){
    //存在隐含契约(虽然在接口声明中增加描述,但扩展性会存在问题.同时放在attachement中的做法需要改进
    ((RpcInvocation)invocation).setAttachment(Constants.INVOCATION_NEED_MOCK, Boolean.TRUE.toString());
    //directory根据invocation中attachment是否有Constants.INVOCATION_NEED_MOCK,来判断获取的是normal invokers or mock invokers
    List<Invoker<T>> invokers = directory.list(invocation);
    return invokers;
    } else {
    return null ;
    }
    }

    directory是RegistryDirectory,RegistryDirectory的list方法里面与mock有关的部分主要是router

  • selectMockInvoker一般返回的都是空,所以要用MockInvoker构造的进行invoke调用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    public Result invoke(Invocation invocation) throws RpcException {
    String mock = getUrl().getParameter(invocation.getMethodName()+"."+Constants.MOCK_KEY);
    if (invocation instanceof RpcInvocation) {
    ((RpcInvocation) invocation).setInvoker(this);
    }
    if (StringUtils.isBlank(mock)){
    mock = getUrl().getParameter(Constants.MOCK_KEY);
    }

    if (StringUtils.isBlank(mock)){
    throw new RpcException(new IllegalAccessException("mock can not be null. url :" + url));
    }

    mock = normallizeMock(URL.decode(mock));

    if (Constants.RETURN_PREFIX.trim().equalsIgnoreCase(mock.trim())){
    //如果没有mock=return关键字
    RpcResult result = new RpcResult();
    result.setValue(null);
    return result;
    } else if (mock.startsWith(Constants.RETURN_PREFIX)) {
    //mock关键字中有return则返回具体的return value
    mock = mock.substring(Constants.RETURN_PREFIX.length()).trim();
    mock = mock.replace('`', '"');
    try {
    Type[] returnTypes = RpcUtils.getReturnTypes(invocation);
    Object value = parseMockValue(mock, returnTypes);
    return new RpcResult(value);
    } catch (Exception ew) {
    throw new RpcException("mock return invoke error. method :" + invocation.getMethodName() + ", mock:" + mock + ", url: "+ url , ew);
    }
    } else if (mock.startsWith(Constants.THROW_PREFIX)) {
    //mock关键字中有throw则抛异常
    mock = mock.substring(Constants.THROW_PREFIX.length()).trim();
    mock = mock.replace('`', '"');
    if (StringUtils.isBlank(mock)){
    throw new RpcException(" mocked exception for Service degradation. ");
    } else { //用户自定义类
    Throwable t = getThrowable(mock);
    throw new RpcException(RpcException.BIZ_EXCEPTION, t);
    }
    } else { //impl mock
    //如果是一个本地伪装实现类则调用这个类实现的对象
    try {
    Invoker<T> invoker = getInvoker(mock);
    return invoker.invoke(invocation);
    } catch (Throwable t) {
    throw new RpcException("Failed to create mock implemention class " + mock , t);
    }
    }
    }

Router->MockInvokersRouter

由上面的directory#list函数过来route方法
RegistryDirectory在初始化内部的routers的时候,会人为的加上一个MockInvokerRouter

setRouters

增加MockInvokersRouter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
AbstractDirectory#setRouters
protected void setRouters(List<Router> routers){
// copy list
routers = routers == null ? new ArrayList<Router>() : new ArrayList<Router>(routers);
// append url router
String routerValue = url.getParameter(Constants.ROUTER_KEY);
if (routerValue != null && routerValue.length() > 0){
RouterFactory routerFactory =
ExtensionLoader.getExtensionLoader(RouterFactory.class).getExtension(routerValue);
routers.add(routerFactory.getRouter(url));
}
// append mock invoker selector
routers.add(new MockInvokersRouter());
Collections.sort(routers);
this.routers = routers;
}
AbstractDirectory#list

RegistryDirectory的list方法最后由router来对invokers进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public <T> List<Invoker<T>> route(final List<Invoker<T>> invokers,URL url, final Invocation invocation) throws RpcException{  
if (invocation.getAttachments() == null){
return getNormalInvokers(invokers);
}
else{
String value = invocation.getAttachments().get(Constants.INVOCATION_NEED_MOCK);
if (value == null){
return getNormalInvokers(invokers);
}
else if (Boolean.TRUE.toString().equalsIgnoreCase(value)){
return getMockInvokers(invokers);
}
}
return invokers;
}
getMockInvokers

一般的情况下,是不会配置mock协议的,所以这个方法返回null

1
2
3
4
5
6
7
8
9
10
11
12
private <T> List<Invoker<T>> getMockInvokers(final List<Invoker<T>> invokers)  {  
if (! hasMockProviders(invokers)){
return null;
}
List<Invoker<T>> resultInvokers = new ArrayList<Invoker<T>>(1);
for (Invoker<T> invoker : invokers){
if (invoker.getUrl().getProtocol().equals(Constants.MOCK_PROTOCOL)){
resultInvokers.add(invoker);
}
}
return resultInvokers;
}

Directory

Directory分RegistryDirectoryStaticDirectory

RegistryDirectory

notify方法NotifyListener中的注册中心的回调能根据注册中心动态变化的根源所在

doList 服务读操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public List<Invoker<T>> doList(Invocation invocation) {
if (forbidden) {
throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "Forbid consumer " + NetUtils.getLocalHost() + " access service " + getInterface().getName() + " from registry " + getUrl().getAddress() + " use dubbo version " + Version.getVersion() + ", Please check registry access list (whitelist/blacklist).");
}
List<Invoker<T>> invokers = null;
Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; // local reference
if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
String methodName = RpcUtils.getMethodName(invocation);
Object[] args = RpcUtils.getArguments(invocation);
if(args != null && args.length > 0 && args[0] != null
&& (args[0] instanceof String || args[0].getClass().isEnum())) {
invokers = localMethodInvokerMap.get(methodName + "." + args[0]); // 可根据第一个参数枚举路由
}
if(invokers == null) {
invokers = localMethodInvokerMap.get(methodName);
}
if(invokers == null) {
invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
}
if(invokers == null) {
Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
if (iterator.hasNext()) {
invokers = iterator.next();
}
}
}
return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
}

引入来源this.methodInvokerMap

notify 服务写操作

注册中心有变化,则notify监听被执行

更新methodInvokerMap和urlInvokerMap的值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
notify
|
|
|
refreshInvoker
|
|
|
toInvokers拿newUrlInvokerMap
|
|
|
toMethodInvokers拿newMethodInvokerMap
|
|
|
变更成员变量
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
this.urlInvokerMap = newUrlInvokerMap;

StaticDirectory

StaticDirectory的Invoker是通过构造函数传入不支持对注册中心的服务引用

Cluster

集群模式配置,按照以下示例在服务提供方和消费方配置集群模式

1
2
<dubbo:service cluster="failsafe" />
<dubbo:reference cluster="failsafe" />

MockClusterWrapper

本地伪装通常用于服务降级
比如某验权服务,当服务提供方全部挂掉后,客户端不抛出异常,而是通过 Mock 数据返回授权失败

由于是Wrapper每次产生的Cluster后都会由这个类包裹

1
2
3
4
5
6
7
8
9
<dubbo:service interface="com.foo.BarService" mock="true" />
<dubbo:service interface="com.foo.BarService" mock="com.foo.BarServiceMock" />

package com.foo;
public class BarServiceMock implements BarService {
public String sayHello(String name) {
return "容错数据";
}
}
1
2
<dubbo:service interface="com.foo.BarService" mock="return null" />
//直接返回null

Mergeable Cluster

分组聚合:按组合并返回结果
比如菜单服务接口一样,但有多种实现用group区分,现在消费方需从每种group中调用一次返回结果,合并结果返回,这样就可以实现聚合菜单项。

RegistryProtocol#refer中有group的判断,如果有Cluster则为MergeableCluster

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY);
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}

// group="a,b" or group="*"
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY));
String group = qs.get(Constants.GROUP_KEY);
if (group != null && group.length() > 0 ) {
if ( ( Constants.COMMA_SPLIT_PATTERN.split( group ) ).length > 1
|| "*".equals( group ) ) {
return doRefer( getMergeableCluster(), registry, type, url );
}
}
return doRefer(cluster, registry, type, url);
}

Failover Cluster

失败自动切换,当出现失败,重试其它服务器 1。通常用于读操作,但重试会带来更长延迟。可通过 retries=”2” 来设置重试次数(不含第一次)。

重试次数配置如下:

1
2
3
4
5
6
<dubbo:service retries="2" />
<dubbo:reference retries="2" />

<dubbo:reference>
<dubbo:method name="findFoo" retries="2" />
</dubbo:reference>

Failfast Cluster

快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。

1
2
3
4
5
6
7
8
9
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
try {
return invoker.invoke(invocation);
} catch (Throwable e) {
if (e instanceof RpcException && ((RpcException)e).isBiz()) { // biz exception.
throw (RpcException) e;
}
throw new RpcException(.....);
}

代码如上,遇到调用失败进入catch直接快速失败

Failsafe Cluster

失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。

1
2
3
4
5
6
7
8
try {
checkInvokers(invokers, invocation);
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
return invoker.invoke(invocation);
} catch (Throwable e) {
logger.error("Failsafe ignore exception: " + e.getMessage(), e);
return new RpcResult(); // ignore
}

Failback Cluster

失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。

failback-cluster-timer线程池用于定时重发

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
//启动一个failback-cluster-timer线程池  
private final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2, new NamedThreadFactory("failback-cluster-timer", true));

try {
checkInvokers(invokers, invocation);
Invoker<T> invoker = select(loadbalance, invocation, invokers, null);
return invoker.invoke(invocation);
} catch (Throwable e) {
logger.error("Failback to invoke method " + invocation.getMethodName() + ", wait for retry in background. Ignored exception: "
+ e.getMessage() + ", ", e);
addFailed(invocation, this);
return new RpcResult(); // ignore
}


private void addFailed(Invocation invocation, AbstractClusterInvoker<?> router) {
if (retryFuture == null) {
synchronized (this) {
if (retryFuture == null) {
retryFuture = scheduledExecutorService.scheduleWithFixedDelay(new Runnable() {

public void run() {
// 收集统计信息
try {
retryFailed();
} catch (Throwable t) { // 防御性容错
logger.error("Unexpected error occur at collect statistic", t);
}
}
}, RETRY_FAILED_PERIOD, RETRY_FAILED_PERIOD, TimeUnit.MILLISECONDS);
}
}
}
failed.put(invocation, router);
}


void retryFailed() {
if (failed.size() == 0) {
return;
}
for (Map.Entry<Invocation, AbstractClusterInvoker<?>> entry : new HashMap<Invocation, AbstractClusterInvoker<?>>(failed).entrySet()) {
Invocation invocation = entry.getKey();
Invoker<?> invoker = entry.getValue();
try {
invoker.invoke(invocation);
failed.remove(invocation);
} catch (Throwable e) {
logger.error("Failed retry to invoke method " + invocation.getMethodName() + ", waiting again.", e);
}
}
}

Forking Cluster

并行调用多个服务器,只要一个成功即返回。
通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks=”2” 来设置最大并行数。

forking-cluster-timer线程池用于并发,forks调用默认为2(invokers的length长度为2)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private final ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("forking-cluster-timer", true));

selected = new ArrayList<Invoker<T>>();
for (int i = 0; i < forks; i++) {
//在invoker列表(排除selected)后,如果没有选够,则存在重复循环问题.见select实现.
Invoker<T> invoker = select(loadbalance, invocation, invokers, selected);
if(!selected.contains(invoker)){//防止重复添加invoker
selected.add(invoker);
}
}

for (final Invoker<T> invoker : selected) {
executor.execute(new Runnable() {
public void run() {
try {
Result result = invoker.invoke(invocation);
ref.offer(result);
} catch(Throwable e) {
int value = count.incrementAndGet();
if (value >= selected.size()) {
ref.offer(e);
}
}
}
});
}

Broadcast Cluster

广播调用所有提供者,逐个调用,任意一台报错则报错。
通常用于通知所有提供者更新缓存或日志等本地资源信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
for (Invoker<T> invoker: invokers) {
try {
result = invoker.invoke(invocation);
} catch (RpcException e) {
exception = e;
logger.warn(e.getMessage(), e);
} catch (Throwable e) {
exception = new RpcException(e.getMessage(), e);
logger.warn(e.getMessage(), e);
}
}
if (exception != null) {
throw exception;
}

Route

有三个实现类,分别是ConditionRouter,MockInvokersSelector,ScriptRouter

route两处使用

  • 第一处为notify写入invoker的时候利用了ConditionRoute进行一个条件筛选
  • 第二处为invoker.invoke()执行调用的时候,list读取invoker的时候利用MockInvokersSelector进行一个mock包装

MockInvokersSelector

list:调用的时候读取invoker

1
2
3
router.getUrl() == null 选定MockInvokersSelector  
参数值`invocation.need.mock`有
并且`invocation.getAttachments`不为空则返回`MockedInvokers`否则返回`NormalInvokers`
  • MockedInvokers:url中带有mock的提供者
  • NormalInvokers:url中带没有mock的提供者

ConditionRouter(条件路由)

条件路由主要就是根据dubbo管理控制台配置的路由规则来过滤相关的invoker
当我们对路由规则点击启用的时候,就会触发RegistryDirectory类的notify方法中

notify:写入invoker的时候会引入ConditionRouter进行

1
2
router.getUrl() != null判断过滤掉MockInvokersSelector
ConditionRouter的matchThen进行主要的invoker过滤
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29

//---->notify
if (routerUrls != null && routerUrls.size() >0 ){
List<Router> routers = toRouters(routerUrls);
if(routers != null){ // null - do nothing
setRouters(routers);
}
}

//---->refreshInvoker---->toMethodInvokers
newMethodInvokerMap.put(method, route(methodInvokers, method));


private List<Invoker<T>> route(List<Invoker<T>> invokers, String method) {
Invocation invocation = new RpcInvocation(method, new Class<?>[0], new Object[0]);
List<Router> routers = getRouters();
if (routers != null) {
for (Router router : routers) {
if (router.getUrl() != null && ! router.getUrl().getParameter(Constants.RUNTIME_KEY, true)) {
invokers = router.route(invokers, getConsumerUrl(), invocation);
}
}
}
return invokers;
}


//主要做条件路由的入口
router.route(invokers, getConsumerUrl(), invocation);

Loadbalance

RoundRobin LoadBalance

随机,按权重设置随机概率。
在一个截面上碰撞的概率高,但调用量越大分布越均匀,而且按概率使用权重后也比较均匀,有利于动态调整提供者权重。

不同权重值

1
2
3
4
服务A,B,C各个权重值: 1, 2, 3
1:维护调用次数自增值
2:调用次数取模计算 (i % maxWeight) 最大的权重值
3:选择取模数为列表下标作为选中invoker

相同权重值

1
2
3
4
服务A,B,C各个权重值: 1, 3, 3
1:维护调用次数自增值
2:调用次数取模计算 (i % maxWeight) 最大的权重值
3:如遇到相同权重, 再次取模运算( i % length) length为相同权重值的长度

Random LoadBalance

轮循,按公约后的权重设置轮循比率。
存在慢的提供者累积请求的问题,比如:第二台机器很慢,但没挂,当请求调到第二台时就卡在那,久而久之,所有请求都卡在调到第二台上。

不同权重值

1
2
3
4
5
6
7
8
9
10
11
12
服务A,B,C各个权重值为 1, 2, 3
1:取出总权重值(1+2+3)
2:random.next(总权重值)
3:随机出来的数字 -= 将服务权重从小到大排序 如果小于 0 即可调用

int offset = random.nextInt(totalWeight);
for (int i = 0; i < length; i++) {
offset -= getWeight(invokers.get(i), invocation);
if (offset < 0) {
return invokers.get(i);
}
}

相同权重值

1
2
3
4
服务A,B,C各个权重值为1, 3 ,3
1:首先会去判断sameWeight 相等权重会不会出现
2:如遇出现直接random.next(length) length为所有invoker的数量
3:随机出来的直接调用

LeastActive LoadBalance

最少活跃调用数,相同活跃数的随机,活跃数指调用前后计数差。
使慢的提供者收到更少请求,因为越慢的提供者的调用前后计数差会越大。

每个服务维护一个活跃数计数器。当A机器开始处理请求,该计数器加1,此时A还未处理完成。若处理完毕则计数器减1。而B机器接受到请求后很快处理完毕。那么A,B的活跃数分别是1,0。当又产生了一个新的请求,则选择B机器去执行(B活跃数最小),这样使慢的机器A收到少的请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
服务A,B,C各个权重值为 1, 2, 3
1:遍历invoker从RpcStatus获取active活跃数以及当前invoker的权重weight

2:遍历判断,如遇到active比之前记录的leastActive(最小活跃数)还小
则刷新leastActive的值还有leastCount最小活跃数invoker的数量以及leastIndexs相关的invokers里面的下标
以及把当前的权重成为totalWeight

3:遍历判断,如遇到跟之前leastActive一致的则直接添加leastIndexs和leastActive递增
并且叠加totalWeight,以及表示是否还有相同权重的情况sameWeight

4:如果没有遇到相同活跃数的直接调用

5:如果遇到相同活跃数但是权重不等的
random.nextInt(totalWeight) 随机值,然后以随机轮询的方法处理
即随机值 -= 每个invoker的权重weight 如果小于 0 即刻调用


6:如果遇到相同活跃数与相同权重值
invokers.get(leastIndexs[random.nextInt(leastCount)])
在活跃数列表中长度随机调用

ConsistentHash LoadBalance

一致性 Hash,相同参数的请求总是发到同一提供者。
当某一台提供者挂时,原本发往该提供者的请求,基于虚拟节点,平摊到其它提供者,不会引起剧烈变动。
算法参见:http://en.wikipedia.org/wiki/Consistent_hashing
缺省只对第一个参数 Hash,如果要修改,请配置

1
<dubbo:parameter key="hash.arguments" value="0,1" />

缺省用 160 份虚拟节点,如果要修改,请配置

1
<dubbo:parameter key="hash.nodes" value="320" />






SPI扩展-ExtensionLoader

变量说明

实例变量根据你的服务接口变化
类变量整个JVM共享

变量名 类型 功能 示例
EXTENSION_LOADERS 类变量 各个接口下对象加载器实例 key为接口Class,value为ExtensionLoader
EXTENSION_INSTANCES 类变量 各个接口实现类 key为实现类的Class,value为Object实现对象
cachedClasses 实例变量 缓存从META-INF拿到的所有实现类 一个hodler,value为Map,Map[“name”->Class]
cachedNames 实例变量 缓存从META-INF拿到的所有实现类 一个ConcurrentMap,key为Class,value为name
cachedInstances 实例变量 holder缓存,holder的value key为name,value为hodler,hodler的value为目标对象
cachedAdaptiveInstance 实例变量 holder value缓存XX@Adaptive相关对象实例 Holder[value=Xxx@Adaptive]
cachedActivates 实例变量 缓存activate相关的信息 一个Map,key为META-INF的name,value为@Activate注解到Class上的注解信息






功能说明

  • getActivateExtension:根据注解条件结合Key获取当前扩展可自动激活的实现
    例如@Activate(group = Constants.PROVIDER),只给生产者端
    URL中又指定key,@Activate(value=”限定key的value值”)
  • getExtension:直接根据名称获取当前扩展的指定实现
  • getAdaptiveExtension : 所有类都会变成XX@Adaptive,根据获取URL的参数自适应拿对象,未指定取@SPI的name

获取实例-操作流程

1
2
3
4
5
6
7
8
9
10
11
12
13
//Adaptive
Xxx接口 对象 = ExtensionLoader.getExtensionLoader(Xxx.class).getAdaptiveExtension();

//Activate
List<Xxx接口> animals = ExtensionLoader.getExtensionLoader(Xxx接口.class).getActivateExtension(
new URL("protocol", "host",
80,//port
ImmutableMap.of("key","指定你的value"))
, "key" //指定你的key
, Constants.PROVIDER); //指定你的group

//Extension
Xxx接口 对象 = ExtensionLoader.getExtensionLoader(Xxx接口.class).getExtension("META-INF的name");

每个接口都会实例化一个ExtensionLoader,为这个接口提供附属的关联对象类
如果之前有实例化过会缓存到EXTENSION_LOADERS,后期直接从这个变量拿
首次实例化某个接口的ExtensionLoader,附带会实例化ExtensionFactory的加载器

1
2
3
4
5
private ExtensionLoader(Class<?> type) {
this.type = type;
objectFactory = (type == ExtensionFactory.class ? null :
ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
}

当实例化ExtensionFactory的时候,代码type == ExtensionFactory.class会生效直接返回null
接下来会先走ExtensionFactorygetAdaptiveExtension生成AdaptiveExtensionFactory
然后走你相关的类加载器的getAdaptiveExtension生成Xxxx接口@Adaptive
具体的getAdaptiveExtension 代码会有解释




Adaptive

面向用户操作

1
ExtensionLoader.getExtensionLoader(你的接口.class).getAdaptiveExtension();

根据你的@Adaptive({"key"}),在构建URL中的map参数中你注解指定的keyvalue,来寻找META-INF对应的name的实现类

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
@SPI("man")
public interface People {
@Adaptive({"key"})
public void say(URL url,String msg);

@Adaptive({"key"})
public void eat(URL url, String msg);
}

public class Man implements People{
public void say(URL url, String msg) {
System.out.println("man say " + msg);
}
public void eat(URL url, String msg) {
System.out.println("man eat " + msg);
}
}
public class Woman implements People{
public void say(URL url, String msg) {
System.out.println("woman say " + msg);
}
public void eat(URL url, String msg) {
System.out.println("woman eat " + msg);
}
}

//man=cn.coderss.service.impl.Man
//women=cn.coderss.service.impl.Woman

public class ServiceTest {
public static void main(String[] args) {
People people = ExtensionLoader.getExtensionLoader(People.class).getAdaptiveExtension();
//参数
HashMap<String,String> map = new HashMap<String, String>();
URL url = new URL("p1", "1.2.3.4", 1010, "path", map);
people.say(url, "test");
//重新构建URL
map.put("key", "women");
URL url2 = new URL("p1", "1.2.3.4", 1010, "path", map);
people.say(url2, "test");
}
}
//man say test
//woman say test




Activate

面向用户操作

1
2
3
4
5
List<Animal> animals = ExtensionLoader.getExtensionLoader(Animal.class).getActivateExtension(
new URL("p1", "1.2.3.4.",
1010,
ImmutableMap.of("key","cat"))
, "key", Constants.PROVIDER);

key指定key后map中用你指定的key添加value,这个value后期用作@Activate(value="你之前map中指定的value")
group可消费者可生产者

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@SPI
public interface Animal {
void say();
}

@Activate(group = Constants.PROVIDER,value = "cat")
public class Cat implements Animal{
@Override
public void say() {
System.out.println("cat say");
}
}
@Activate(group = Constants.PROVIDER,value = "dog")
public class Dog implements Animal{
@Override
public void say() {
System.out.println("dog say");
}
}

//dog=cn.coderss.service.impl.Dog
//cat=cn.coderss.service.impl.Cat

List<Animal> animals = ExtensionLoader.getExtensionLoader(Animal.class).getActivateExtension(
new URL("p1", "1.2.3.4.",
1010,
ImmutableMap.of("key","cat"))
, "key", Constants.PROVIDER);
//dog的实现类会被忽略
for (Animal animal:animals){
animal.say();
}




Extension

直接指定META-INF中的name对应实现类,进行关联和获取扩展实现类

1
ExtensionLoader.getExtensionLoader(你的接口.class).getExtension("你的name");

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class Cat implements Animal{
@Override
public void say() {
System.out.println("cat say");
}
}

@SPI
public interface Animal {
void say();
}

//cat=cn.coderss.service.impl.Cat

Animal cat = ExtensionLoader.getExtensionLoader(Animal.class).getExtension("cat");
cat.say();






加载流程

取相关实现类

Extension

getExtension
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public T getExtension(String name) {
if (name == null || name.length() == 0)
throw new IllegalArgumentException("Extension name == null");
if ("true".equals(name)) {
return getDefaultExtension();
}
//前方为参数检查

//检查cachedInstances是否有holder缓存
Holder<Object> holder = cachedInstances.get(name);
if (holder == null) {
cachedInstances.putIfAbsent(name, new Holder<Object>());
holder = cachedInstances.get(name);
}
Object instance = holder.get();
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
instance = createExtension(name);
holder.set(instance);
}
}
}
return (T) instance;
}
createExtension

getExtensionClasses中的cachedClasses拿取相关的缓存实现类
拿到Class后去EXTENSION_INSTANCES取这个全局变量内是否有此Class的对象
有则返回,无则clazz.newInstance()实例化
查询此接口是否有wrapperClasses

wrapperClasses:即构造函数是否为本接口的Class,有则为wrapperClasses

如果有wrapperClasses相关的类,则将上一轮实例化后的对象再放入此构造器被wrapperClass包裹
最后进行一次injectExtension进行处理

injectExtension:查是否有set函数,有且只有一个参数,且public修饰符则进行自动注入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private T createExtension(String name) {
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
injectExtension(instance);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (wrapperClasses != null && wrapperClasses.size() > 0) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
type + ") could not be instantiated: " + t.getMessage(), t);
}
}
injectExtension

判断是否有set函数且参数为1个且为public
如果有则进行自动注入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private T injectExtension(T instance) {
try {
if (objectFactory != null) {
for (Method method : instance.getClass().getMethods()) {
if (method.getName().startsWith("set")
&& method.getParameterTypes().length == 1
&& Modifier.isPublic(method.getModifiers())) {
Class<?> pt = method.getParameterTypes()[0];
try {
String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
Object object = objectFactory.getExtension(pt, property);
if (object != null) {
method.invoke(instance, object);
}
} catch (Exception e) {
logger.error("fail to inject via method " + method.getName()
+ " of interface " + type.getName() + ": " + e.getMessage(), e);
}
}
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return instance;
}




AdaptiveExtension

getAdaptiveExtension

cachedAdaptiveInstance拿,如不存在则创建,cachedAdaptiveInstance为实例变量不共享,根据你的接口而变

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
 public T getAdaptiveExtension() {
Object instance = cachedAdaptiveInstance.get();
if (instance == null) {
if(createAdaptiveInstanceError == null) {
synchronized (cachedAdaptiveInstance) {
instance = cachedAdaptiveInstance.get();
if (instance == null) {
try {
instance = createAdaptiveExtension();
cachedAdaptiveInstance.set(instance);
} catch (Throwable t) {
createAdaptiveInstanceError = t;
throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t);
}
}
}
}
else {
throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
}
}

return (T) instance;
}
createAdaptiveExtension

进行创建工作,获取相关类,执行实例化

  • getAdaptiveExtensionClass()

    获取相关的Xxx@Adaptive对象的类

  • newInstance()

    进行实例化操作

1
2
3
4
5
6
7
private T createAdaptiveExtension() {
try {
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
} catch (Exception e) {
throw new IllegalStateException("Can not create adaptive extenstion " + type + ", cause: " + e.getMessage(), e);
}
}
getAdaptiveExtensionClass

获取相关Adaptive类信息,如果不存在则由此去创建
不存在判断:META-INF关联的类Class级别注解没有@Adaptive

1
2
3
4
5
6
7
private Class<?> getAdaptiveExtensionClass() {
getExtensionClasses();
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}

createAdaptiveExtensionClass

由此将创建Xxx@Adaptive对象信息
寻找合适的ClassLoader
利用compiler为JavassistCompiler去执行编译加载内存

1
2
3
4
5
6
private Class<?> createAdaptiveExtensionClass() {
String code = createAdaptiveExtensionClassCode();
ClassLoader classLoader = findClassLoader();
com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
return compiler.compile(code, classLoader);
}

createAdaptiveExtensionClassCode

主要去拼装相关的Class字节信息
它主要找有没有@Adaptive注解修饰的方法,如果有将会修饰成如下的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
public 返回类型 方法名(com.alibaba.dubbo.common.URL arg0) {
if (arg0 == null)
throw new IllegalArgumentException("url == null");
com.alibaba.dubbo.common.URL url = arg0;
String extName = ( url.getProtocol() == null ? "你设定的@SPI默认值" : url.getProtocol() );
if(extName == null)
throw new IllegalStateException("Fail to get extension(你的接口) name from url(" + url.toString() + ") use keys([protocol])");
你的接口 extension = (你的接口)ExtensionLoader.getExtensionLoader(你的接口).getExtension(extName);
return extension.方法名(arg0);
}



//以下是源代码
private String createAdaptiveExtensionClassCode() {
StringBuilder codeBuidler = new StringBuilder();
Method[] methods = type.getMethods();
boolean hasAdaptiveAnnotation = false;
for(Method m : methods) {
if(m.isAnnotationPresent(Adaptive.class)) {
hasAdaptiveAnnotation = true;
break;
}
}
// 完全没有Adaptive方法,则不需要生成Adaptive类
if(! hasAdaptiveAnnotation)
throw new IllegalStateException("No adaptive method on extension " + type.getName() + ", refuse to create the adaptive class!");

codeBuidler.append("package " + type.getPackage().getName() + ";");
codeBuidler.append("\nimport " + ExtensionLoader.class.getName() + ";");
codeBuidler.append("\npublic class " + type.getSimpleName() + "$Adpative" + " implements " + type.getCanonicalName() + " {");

for (Method method : methods) {
Class<?> rt = method.getReturnType();
Class<?>[] pts = method.getParameterTypes();
Class<?>[] ets = method.getExceptionTypes();

Adaptive adaptiveAnnotation = method.getAnnotation(Adaptive.class);
StringBuilder code = new StringBuilder(512);
if (adaptiveAnnotation == null) {
code.append("throw new UnsupportedOperationException(\"method ")
.append(method.toString()).append(" of interface ")
.append(type.getName()).append(" is not adaptive method!\");");
} else {
int urlTypeIndex = -1;
for (int i = 0; i < pts.length; ++i) {
if (pts[i].equals(URL.class)) {
urlTypeIndex = i;
break;
}
}
// 有类型为URL的参数
if (urlTypeIndex != -1) {
// Null Point check
String s = String.format("\nif (arg%d == null) throw new IllegalArgumentException(\"url == null\");",
urlTypeIndex);
code.append(s);

s = String.format("\n%s url = arg%d;", URL.class.getName(), urlTypeIndex);
code.append(s);
}
// 参数没有URL类型
else {
String attribMethod = null;

// 找到参数的URL属性
LBL_PTS:
for (int i = 0; i < pts.length; ++i) {
Method[] ms = pts[i].getMethods();
for (Method m : ms) {
String name = m.getName();
if ((name.startsWith("get") || name.length() > 3)
&& Modifier.isPublic(m.getModifiers())
&& !Modifier.isStatic(m.getModifiers())
&& m.getParameterTypes().length == 0
&& m.getReturnType() == URL.class) {
urlTypeIndex = i;
attribMethod = name;
break LBL_PTS;
}
}
}
if(attribMethod == null) {
throw new IllegalStateException("fail to create adative class for interface " + type.getName()
+ ": not found url parameter or url attribute in parameters of method " + method.getName());
}

// Null point check
String s = String.format("\nif (arg%d == null) throw new IllegalArgumentException(\"%s argument == null\");",
urlTypeIndex, pts[urlTypeIndex].getName());
code.append(s);
s = String.format("\nif (arg%d.%s() == null) throw new IllegalArgumentException(\"%s argument %s() == null\");",
urlTypeIndex, attribMethod, pts[urlTypeIndex].getName(), attribMethod);
code.append(s);

s = String.format("%s url = arg%d.%s();",URL.class.getName(), urlTypeIndex, attribMethod);
code.append(s);
}

String[] value = adaptiveAnnotation.value();
// 没有设置Key,则使用“扩展点接口名的点分隔 作为Key
if(value.length == 0) {
char[] charArray = type.getSimpleName().toCharArray();
StringBuilder sb = new StringBuilder(128);
for (int i = 0; i < charArray.length; i++) {
if(Character.isUpperCase(charArray[i])) {
if(i != 0) {
sb.append(".");
}
sb.append(Character.toLowerCase(charArray[i]));
}
else {
sb.append(charArray[i]);
}
}
value = new String[] {sb.toString()};
}

boolean hasInvocation = false;
for (int i = 0; i < pts.length; ++i) {
if (pts[i].getName().equals("com.alibaba.dubbo.rpc.Invocation")) {
// Null Point check
String s = String.format("\nif (arg%d == null) throw new IllegalArgumentException(\"invocation == null\");", i);
code.append(s);
s = String.format("\nString methodName = arg%d.getMethodName();", i);
code.append(s);
hasInvocation = true;
break;
}
}

String defaultExtName = cachedDefaultName;
String getNameCode = null;
for (int i = value.length - 1; i >= 0; --i) {
if(i == value.length - 1) {
if(null != defaultExtName) {
if(!"protocol".equals(value[i]))
if (hasInvocation)
getNameCode = String.format("url.getMethodParameter(methodName, \"%s\", \"%s\")", value[i], defaultExtName);
else
getNameCode = String.format("url.getParameter(\"%s\", \"%s\")", value[i], defaultExtName);
else
getNameCode = String.format("( url.getProtocol() == null ? \"%s\" : url.getProtocol() )", defaultExtName);
}
else {
if(!"protocol".equals(value[i]))
if (hasInvocation)
getNameCode = String.format("url.getMethodParameter(methodName, \"%s\", \"%s\")", value[i], defaultExtName);
else
getNameCode = String.format("url.getParameter(\"%s\")", value[i]);
else
getNameCode = "url.getProtocol()";
}
}
else {
if(!"protocol".equals(value[i]))
if (hasInvocation)
getNameCode = String.format("url.getMethodParameter(methodName, \"%s\", \"%s\")", value[i], defaultExtName);
else
getNameCode = String.format("url.getParameter(\"%s\", %s)", value[i], getNameCode);
else
getNameCode = String.format("url.getProtocol() == null ? (%s) : url.getProtocol()", getNameCode);
}
}
code.append("\nString extName = ").append(getNameCode).append(";");
// check extName == null?
String s = String.format("\nif(extName == null) " +
"throw new IllegalStateException(\"Fail to get extension(%s) name from url(\" + url.toString() + \") use keys(%s)\");",
type.getName(), Arrays.toString(value));
code.append(s);

s = String.format("\n%s extension = (%<s)%s.getExtensionLoader(%s.class).getExtension(extName);",
type.getName(), ExtensionLoader.class.getSimpleName(), type.getName());
code.append(s);

// return statement
if (!rt.equals(void.class)) {
code.append("\nreturn ");
}

s = String.format("extension.%s(", method.getName());
code.append(s);
for (int i = 0; i < pts.length; i++) {
if (i != 0)
code.append(", ");
code.append("arg").append(i);
}
code.append(");");
}

codeBuidler.append("\npublic " + rt.getCanonicalName() + " " + method.getName() + "(");
for (int i = 0; i < pts.length; i ++) {
if (i > 0) {
codeBuidler.append(", ");
}
codeBuidler.append(pts[i].getCanonicalName());
codeBuidler.append(" ");
codeBuidler.append("arg" + i);
}
codeBuidler.append(")");
if (ets.length > 0) {
codeBuidler.append(" throws ");
for (int i = 0; i < ets.length; i ++) {
if (i > 0) {
codeBuidler.append(", ");
}
codeBuidler.append(pts[i].getCanonicalName());
}
}
codeBuidler.append(" {");
codeBuidler.append(code.toString());
codeBuidler.append("\n}");
}
codeBuidler.append("\n}");
if (logger.isDebugEnabled()) {
logger.debug(codeBuidler.toString());
}
return codeBuidler.toString();
}


ActivateExtension

getActivateExtension

根据传进来的参数,拿取url中指定的key对应的value,如果value为空直接返回null
所以使用ActivateExtension必须要指定keyvalue

1
2
3
4
public List<T> getActivateExtension(URL url, String key, String group) {
String value = url.getParameter(key);
return getActivateExtension(url, value == null || value.length() == 0 ? null : Constants.COMMA_SPLIT_PATTERN.split(value), group);
}


拿到value后进入以下方法,并附带了urlgroup
主要做了以下步骤

  • getExtensionClasses:获取当前加载器相关的Class信息,并且领出cachedActivates信息,cachedActivates为当前加载器下的满足以下条件就会加入

    1
    2
    3
    4
    Activate activate = clazz.getAnnotation(Activate.class);
    if (activate != null) {
    cachedActivates.put(names[0], activate);
    }
  • isMatchGroup:匹配group组是否符合,即consumer或provider,符合组即getExtension(name);取出对象信息

  • !names.contains(name) && isActive(….): 判断key是否相符,相符则加入到exts,如果未指定注解value也被加入进去

    META-INF中的name如果带有”-“标识则表现出为移除状态
    ! names.contains(name)先排除掉让没有key指定的对象先添加到list再排序list
    后期再加T ext = getExtension(name);对ext对象直接添加到exts列表,保证了调用的先后顺序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public List<T> getActivateExtension(URL url, String[] values, String group) {
List<T> exts = new ArrayList<T>();
List<String> names = values == null ? new ArrayList<String>(0) : Arrays.asList(values);
if (! names.contains(Constants.REMOVE_VALUE_PREFIX + Constants.DEFAULT_KEY)) {
getExtensionClasses();
for (Map.Entry<String, Activate> entry : cachedActivates.entrySet()) {
String name = entry.getKey();
Activate activate = entry.getValue();
if (isMatchGroup(group, activate.group())) {
T ext = getExtension(name);
if (! names.contains(name)
&& ! names.contains(Constants.REMOVE_VALUE_PREFIX + name)
&& isActive(activate, url)) {
exts.add(ext);
}
}
}
Collections.sort(exts, ActivateComparator.COMPARATOR);
}
List<T> usrs = new ArrayList<T>();
for (int i = 0; i < names.size(); i ++) {
String name = names.get(i);
if (! name.startsWith(Constants.REMOVE_VALUE_PREFIX)
&& ! names.contains(Constants.REMOVE_VALUE_PREFIX + name)) {
if (Constants.DEFAULT_KEY.equals(name)) {
if (usrs.size() > 0) {
exts.addAll(0, usrs);
usrs.clear();
}
} else {
T ext = getExtension(name);
usrs.add(ext);
}
}
}
if (usrs.size() > 0) {
exts.addAll(usrs);
}
return exts;
}






加载META-INF接口相关实现类

getExtensionClasses

如果当前缓存类持有容器不存在相关的Map(name->实现类),则进行加载流程

1
2
3
4
5
6
7
8
9
10
11
12
13
private Map<String, Class<?>> getExtensionClasses() {
Map<String, Class<?>> classes = cachedClasses.get();
if (classes == null) {
synchronized (cachedClasses) {
classes = cachedClasses.get();
if (classes == null) {
classes = loadExtensionClasses();
cachedClasses.set(classes);
}
}
}
return classes;
}

loadExtensionClasses

组织加载流程三个目录

dir目标目录中加载接口-实现类配置文件

dir目录有这三类:

  • SERVICES_DIRECTORY = “META-INF/services/“
  • DUBBO_DIRECTORY = “META-INF/dubbo/“
  • DUBBO_INTERNAL_DIRECTORY = DUBBO_DIRECTORY + “internal/“

这些目录的文件样式如下:

1
monitor=com.alibaba.dubbo.monitor.support.MonitorFilter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private Map<String, Class<?>> loadExtensionClasses() {
final SPI defaultAnnotation = type.getAnnotation(SPI.class);
if(defaultAnnotation != null) {
String value = defaultAnnotation.value();
if(value != null && (value = value.trim()).length() > 0) {
String[] names = NAME_SEPARATOR.split(value);
if(names.length > 1) {
throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()
+ ": " + Arrays.toString(names));
}
if(names.length == 1) cachedDefaultName = names[0];
}
}

Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
loadFile(extensionClasses, DUBBO_DIRECTORY);
loadFile(extensionClasses, SERVICES_DIRECTORY);
return extensionClasses;
}

loadFile

extensionClasses为成员变量cachedClasses

以下是主要加载文件的源代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
private void loadFile(Map<String, Class<?>> extensionClasses, String dir) {
String fileName = dir + type.getName();
try {
Enumeration<java.net.URL> urls;
ClassLoader classLoader = findClassLoader();
if (classLoader != null) {
urls = classLoader.getResources(fileName);
} else {
urls = ClassLoader.getSystemResources(fileName);
}
if (urls != null) {
while (urls.hasMoreElements()) {
java.net.URL url = urls.nextElement();
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream(), "utf-8"));
try {
String line = null;
while ((line = reader.readLine()) != null) {
final int ci = line.indexOf('#');
if (ci >= 0) line = line.substring(0, ci);
line = line.trim();
if (line.length() > 0) {
try {
String name = null;
int i = line.indexOf('=');
if (i > 0) {
name = line.substring(0, i).trim();
line = line.substring(i + 1).trim();
}
//例如monitor=com.alibaba.dubbo.monitor.support.MonitorFilter
//被分解成name=monitor,line=com.alibaba.dubbo.monitor.support.MonitorFilter

if (line.length() > 0) {
Class<?> clazz = Class.forName(line, true, classLoader);
if (! type.isAssignableFrom(clazz)) {
throw new IllegalStateException("Error when load extension class(interface: " +
type + ", class line: " + clazz.getName() + "), class "
+ clazz.getName() + "is not subtype of interface.");
}
if (clazz.isAnnotationPresent(Adaptive.class)) {

//如果有Adaptive.class注解,就将现在的cachedAdaptiveClass成员变量设定这个类

if(cachedAdaptiveClass == null) {
cachedAdaptiveClass = clazz;
} else if (! cachedAdaptiveClass.equals(clazz)) {
throw new IllegalStateException("More than 1 adaptive class found: "
+ cachedAdaptiveClass.getClass().getName()
+ ", " + clazz.getClass().getName());
}

} else {
try {
//查看这个类是否有此类的构造器,如果有判定为Wrapper包裹类
//并且加入到wrappers列表

clazz.getConstructor(type);
Set<Class<?>> wrappers = cachedWrapperClasses;
if (wrappers == null) {
cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
wrappers = cachedWrapperClasses;
}
wrappers.add(clazz);
} catch (NoSuchMethodException e) {

//再查是否有注解
clazz.getConstructor();
if (name == null || name.length() == 0) {
name = findAnnotationName(clazz);
if (name == null || name.length() == 0) {
if (clazz.getSimpleName().length() > type.getSimpleName().length()
&& clazz.getSimpleName().endsWith(type.getSimpleName())) {
name = clazz.getSimpleName().substring(0, clazz.getSimpleName().length() - type.getSimpleName().length()).toLowerCase();
} else {
throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + url);
}
}
}

//查询name中是否存在Activate注解
//如果有Activate注解,则put到cachedActivates
//最后将类放入cachedNames,extensionClasses

String[] names = NAME_SEPARATOR.split(name);
if (names != null && names.length > 0) {
Activate activate = clazz.getAnnotation(Activate.class);
if (activate != null) {
cachedActivates.put(names[0], activate);
}
for (String n : names) {
if (! cachedNames.containsKey(clazz)) {
cachedNames.put(clazz, n);
}
Class<?> c = extensionClasses.get(n);
if (c == null) {
extensionClasses.put(n, clazz);
} else if (c != clazz) {
throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName());
}
}
}
}
}
}
} catch (Throwable t) {
IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " + type + ", class line: " + line + ") in " + url + ", cause: " + t.getMessage(), t);
exceptions.put(line, e);
}
}
} // end of while read lines
} finally {
reader.close();
}
} catch (Throwable t) {
logger.error("Exception when load extension class(interface: " +
type + ", class file: " + url + ") in " + url, t);
}
} // end of while urls
}
} catch (Throwable t) {
logger.error("Exception when load extension class(interface: " +
type + ", description file: " + fileName + ").", t);
}
}

以上代码经历了以下主要步骤

从主要三个目录加载文件
1
2
3
4
5
6
SERVICES_DIRECTORY = "META-INF/services/"
DUBBO_DIRECTORY = "META-INF/dubbo/"
DUBBO_INTERNAL_DIRECTORY = DUBBO_DIRECTORY + "internal/"

//文件内容如下
monitor=com.alibaba.dubbo.monitor.support.MonitorFilter
分解name,line两部分
1
2
3
4
while 循环每一行
name = line.substring(0, i).trim();
line = line.substring(i + 1).trim();
取出name和line两部分
设定加cachedAdaptiveClass
1
2
clazz.isAnnotationPresent(Adaptive.class)
//如果有Adaptive.class注解,就将现在的cachedAdaptiveClass成员变量设定这个类
添加cachedActivates
1
2
3
4
5
6
Activate activate = clazz.getAnnotation(Activate.class);
if (activate != null) {
cachedActivates.put(names[0], activate);
}
//查询name中是否存在Activate注解
//如果有Activate注解,则put到cachedActivates
添加cachedNames,extensionClasses
1
2
3
4
5
6
7
8
9
10
11
12
//最后将类放入cachedNames,extensionClasses
for (String n : names) {
if (! cachedNames.containsKey(clazz)) {
cachedNames.put(clazz, n);
}
Class<?> c = extensionClasses.get(n);
if (c == null) {
extensionClasses.put(n, clazz);
} else if (c != clazz) {
throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName());
}
}






服务-Consumer,Producer

消费者

消费者-服务引用

流程序列图

01




大致流程

  • ReferenceBean实现FactoryBeangetObject方法进入

    DubboNamespaceHandler解析registerBeanDefinitionParser,从而带入ReferenceBean到Spring环境中
    所有的消费者都将会在身为FactoryBeanReferenceBeangetObject方法拿取引用服务的Bean

  • refprotocol[Protocol$Adaptive].refer通过url为registry关键字找RegistryProtocol#refer
  • RegistryProtocol#refer首先做注册registry.register
  • RegistryProtocol#refer接着做注册中心订阅directory.subscribe
  • RegistryDirectory往zk进行操作zookeeperRegistry.subscribe,并注册自身的NotifyListener#notify方法
  • RegistryDirectory在zk得到响应notify方法被调用接着引出refreshInvoker整理从注册中心得到的服务Invoker列表
  • refreshInvoker主要操作toInvokerstoMethodInvokers两个相关的方法来换取相关的Invokers

    • toInvokers:

      1
      invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);

      由上方代码引出DubboProtocol

    • toMethodInvokers:

      1
      newMethodInvokerMap.put(method, route(methodInvokers, method));

      由上方代码引出route机制

  • DubboProtocol#refer主要构件DubboInvoker并且建立客户端链接getClients(url),并添加invokers

    1
    2
    DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
    invokers.add(invoker);
  • FailoverClusterRegistryDirectory结束notify之后会得到根据route规则通过directory.list拿到的Invokers列表,然后用FailoverClusterInvoker包裹

  • JavassistProxyFactory将上一步得到的ClusterInvoker到这里来进行getProxy代理操作

消费者-调用

流程序列图

doInvoke

大致流程

  • InvokerInvocationHandler由于所有的引用服务都被JavassistProxyFactory.getProxy所以的Invoker调用都被这个InvokerInvocationHandler包裹
  • MockClusterInvoker因为有MockClusterWrapper所以最后在FailoverCluster上包裹了一层

    主要用于判断:有无mock,强制mock,失败后mock三种失败mock策略

  • FailoverClusterInvoker在父级AbstractClusterInvoker选定好后loadbalance策略,到了本对象的doInvoke直接激活负载策略,如遇失败重试,具体重试次数根据设定来

    主要用于各类容错机制

  • InvokerWrapper因为在InvokerDelegete的初始化,作为父级类包裹了invoker
  • ProtocolFilterWrapper由于DubboProtocol要经过Filter责任链的洗礼,所以这里的Invoker全部是Consumer端的Filter

    通过责任链可以添加各类扩展

  • ListenerExporterWrapper由于ProtocolListenerWrapper也是DubboProtocol包裹类,在refer操作中被强制包上了ListenerInvokerWrapper

    实现InvokerListener进行invoker监听动作

  • DubboInvoker最终来到了这里拿取client进行远程调用

invoker 调用细节

route.route会用在两个地方

  • notify:写入invoker的时候会引入ConditionRouter进行

    1
    2
    router.getUrl() != null判断过滤掉MockInvokersSelector
    ConditionRouter的matchThen进行主要的invoker过滤
  • list:调用的时候读取invoker

    1
    2
    router.getUrl() == null 选定MockInvokersSelector  
    参数值`invocation.need.mock`有并且`invocation.getAttachments`不为空则返回`MockedInvokers`否则返回`NormalInvokers`






生产者

生产者-服务暴露

流程序列图

05

大致流程

  • 配置为none不暴露
  • 配置不是remote的情况下做本地暴露 (配置为remote,则表示只暴露远程服务)
  • 如果配置不是local则暴露为远程服务.(配置为local,则表示只暴露远程服务)
本地暴露
  • proxyFactory.getInvoker:
  • protocol.export

    Protocol$Adaptive(InJvmProtocol)
    两个Wrapper:ProtocolFilterWrapper(官方9个Filter),ProtocolListenerWrapper(包裹ListenerExportWrapper)
    在Filter包裹后,Listener包裹之前构建InJvmExport

  • exporters.add(exporter);

远程暴露
  • proxyFactory.getInvoker
  • protocol.export

    Protocol$Adaptive(RegistryProtocol)

    • 构建InvokeDelegate
    • protocol.export(invokeDelegate)暴露服务
    • registry.registerOverrideListener注册

      Protocol$Adaptive(DubboProtocol)

    • ProtocolFilterWrapper嵌套责任链

    • DubboExport将Invoke转DubboExport
    • openServer开启Netty网络服务

      return New Export(....)返回Export出去

  • exporters.add(exporter);






注册中心-Zookeeper

zookeeperRegistry

FailbackRegistry

作为ZookeeperRegistry的父类:专注用于失败后的重试机制
DubboRegistryFailedRetryTimer线程池用来做定时任务

  • doRegister : 注册中心失败

  • doSubscribe : 监听注册中心失败重试

  • doUnsubscribe : 向服务器端发送取消订阅请求失败重试

  • doNotify : 将失败的通知请求记录到失败列表,定时重试






网络模型-Netty

HeaderExchangeClient/HeaderExchangeServer->NettyServer/NettyClient
NettyServer/NettyClient—>MultiMessageHandler–>HeartbeatHandler—> AllChannelHandler
[HeaderExchanger构建] DecodeHandler-> HeaderExchangeHandler->NettyHandler

消费者-网络模型

03

  • DubboInvoke获取Client判读是否是共享client还是非共享client
  • Exchangers工具类,获得合适的Exchanger进行connect链接,默认为HeaderExchanger
  • HeaderExchanger进行RequestHandler参数包裹

    HeaderExchangeHandler包裹第一层
    DecodeHandler包裹第二层
    Transporters.connect起客户端链接
    HeaderExchangeClient包裹第三层,并且在初始化防范中起dubbo-remoting-client-heartbeat心跳

  • Transporters工具类,根据上一轮步骤Transporters.connect起客户端链接衍生

    1
    getTransporter() //默认获取NettyTransporter
  • NettyTransporter初始化新的NettyClient返回回去

  • NettyClient

    wrapChannelHandler进行包裹

    • 寻找Dispatch,默认AllDispatcher实现类

      • AllChannelHandler包裹Url以及handler(被DecodeHandler,HeaderExchangeHandler包裹)

      • WrappedChannelHandler初始化中起executor(DubboClientHandler)

      • 再初始化一个dataStore存储url->executor

    • HeartbeatHandler包裹
    • MultiMessageHandler包裹




参数功能点-URL

屏蔽功能

dubbo管控台调控

消费者 禁止 被禁止的客户端将收到禁止访问异常。
notify的时候会带上route的url

1
route://0.0.0.0/cn.coderss.service.user.api.UserService?category=routers&dynamic=false&enabled=true&force=true&name=cn.coderss.service.user.api.UserService blackwhitelist&priority=0&router=condition&rule=consumer.host+%3D+192.168.3.149+%3D%3E+false&runtime=false

“rule” -> “consumer.host+=+192.168.3.149+=>+false”

1
2
3
4
RegistryDirectory#notify->refreshInvoker->toMethodInvokers->newMethodInvokerMap.put(method, route(methodInvokers, method));

invokers = router.route(invokers, getConsumerUrl(), invocation);
//路由得到无Invokers结果

Mock服务降级&&容错机制

注意点:初始化的时候如果mock=force:return+null或者mock=fail:return+null都会失效,只有mock=retur null才能行
因为AbstractMethodConfig#setMock有相关的mock参数检测,只允许.数字字母

dubbo管控台调控

1
2
zookeeper注册中心变更
RegistryDirectory#notify->refreshInvoker->toInvokers->mergeUrl

导致变更以下代码

1
2
//directoryUrl 与 override 合并是在notify的最后,这里不能够处理
this.overrideDirectoryUrl = this.overrideDirectoryUrl.addParametersIfAbsent(providerUrl.getParameters()); // 合并提供者参数

调用的时候MockClusterInvoker#invoke

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//获取`overrideDirectoryUrl`相关的mock参数  
String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();


//no mock
result = this.invoker.invoke(invocation);

//force:direct mock
result = doMockInvoke(invocation, null);

//fail-mock
try {
result = this.invoker.invoke(invocation);
}catch (RpcException e) {
if (e.isBiz()) {
throw e;
} else {
if (logger.isWarnEnabled()) {
logger.info("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
}
result = doMockInvoke(invocation, e);
}
}

mock=force:return+null当强制返回的时候或者fail:return+null的情况下
首先都会去RegistryDirectory#list找是否提供相关的mock协议而不是dubbo协议的服务提供者
如果有则调用这些invokers否则自己去组装一个mockInvoker

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
MockInvoker#invoke

mock = normallizeMock(URL.decode(mock));
if (Constants.RETURN_PREFIX.trim().equalsIgnoreCase(mock.trim())){
RpcResult result = new RpcResult();
result.setValue(null);
return result;
} else if (mock.startsWith(Constants.RETURN_PREFIX)) {
mock = mock.substring(Constants.RETURN_PREFIX.length()).trim();
mock = mock.replace('`', '"');
try {
Type[] returnTypes = RpcUtils.getReturnTypes(invocation);
Object value = parseMockValue(mock, returnTypes);
return new RpcResult(value);
} catch (Exception ew) {
throw new RpcException("mock return invoke error. method :" + invocation.getMethodName() + ", mock:" + mock + ", url: "+ url , ew);
}
} else if (mock.startsWith(Constants.THROW_PREFIX)) {
mock = mock.substring(Constants.THROW_PREFIX.length()).trim();
mock = mock.replace('`', '"');
if (StringUtils.isBlank(mock)){
throw new RpcException(" mocked exception for Service degradation. ");
} else { //用户自定义类
Throwable t = getThrowable(mock);
throw new RpcException(RpcException.BIZ_EXCEPTION, t);
}
} else { //impl mock
try {
Invoker<T> invoker = getInvoker(mock);
return invoker.invoke(invocation);
} catch (Throwable t) {
throw new RpcException("Failed to create mock implemention class " + mock , t);
}
}

初始化服务调控

1
2
3
4
5
6
7
8
9
10
@Reference(timeout = 600000, mock = "return null")
//在容错的时候不允许mock= "force:return+null"或者mock = "fail:return+null" 这是容错不是服务降级
//否则在setMock属性会去检测 checkName->`Pattern.compile("[\\-._0-9a-zA-Z]+")` 这些范围才允许mock容错生效
UserService service;


@RequestMapping("/user")
public User getUser(String name){
return service.getUserByName(name);
}

线程模型

image-20180803143759053

业务线程模型

Dispatcher

  • all 除发送消息之外,其他全部使用线程池处理;包括请求,响应,连接事件,断开事件,心跳等。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    public void connected(Channel channel) throws RemotingException {
    ExecutorService cexecutor = getExecutorService();
    try{
    cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CONNECTED));
    }catch (Throwable t) {
    throw new ExecutionException("connect event", channel, getClass()+" error when process connected event ." , t);
    }
    }

    public void disconnected(Channel channel) throws RemotingException {
    ExecutorService cexecutor = getExecutorService();
    try{
    cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.DISCONNECTED));
    }catch (Throwable t) {
    throw new ExecutionException("disconnect event", channel, getClass()+" error when process disconnected event ." , t);
    }
    }

    public void received(Channel channel, Object message) throws RemotingException {
    ExecutorService cexecutor = getExecutorService();
    try {
    cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
    } catch (Throwable t) {
    throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
    }
    }

    public void caught(Channel channel, Throwable exception) throws RemotingException {
    ExecutorService cexecutor = getExecutorService();
    try{
    cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CAUGHT, exception));
    }catch (Throwable t) {
    throw new ExecutionException("caught event", channel, getClass()+" error when process caught event ." , t);
    }
    }

    private ExecutorService getExecutorService() {
    ExecutorService cexecutor = executor;
    if (cexecutor == null || cexecutor.isShutdown()) {
    cexecutor = SHARED_EXECUTOR;
    }
    return cexecutor;
    }
  • execution 除发送消息之外,其他全部使用线程池处理;

    与AllChannelHandler不同之处在于若创建的线程池ExecutorService不可用,AllChannelHandler将使用共享线程池,而ExecutionChannelHandler只有报错了。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    public void connected(Channel channel) throws RemotingException {
    executor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CONNECTED));
    }

    public void disconnected(Channel channel) throws RemotingException {
    executor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.DISCONNECTED));
    }

    public void received(Channel channel, Object message) throws RemotingException {
    executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
    }

    public void caught(Channel channel, Throwable exception) throws RemotingException {
    executor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CAUGHT, exception));
    }
  • direct 所有消息都不派发到线程池,全部在 IO 线程上直接执行。

    1
    2
    3
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
    return handler;
    }
  • message 只有请求响应消息派发到线程池,其它连接断开事件,心跳等消息,直接在 IO 线程上执行。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
    return new MessageOnlyChannelHandler(handler, url);
    }
    ......

    public void received(Channel channel, Object message) throws RemotingException {
    ExecutorService cexecutor = executor;
    if (cexecutor == null || cexecutor.isShutdown()) {
    cexecutor = SHARED_EXECUTOR;
    }
    try {
    cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
    } catch (Throwable t) {
    throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
    }
    }
  • connection 在 IO 线程上,将连接断开事件放入队列,有序逐个执行,其它消息派发到线程池。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
    super(handler, url);
    String threadName = url.getParameter(Constants.THREAD_NAME_KEY,Constants.DEFAULT_THREAD_NAME);
    connectionExecutor = new ThreadPoolExecutor(1, 1,
    0L, TimeUnit.MILLISECONDS,
    new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(Constants.CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
    new NamedThreadFactory(threadName, true),
    new AbortPolicyWithReport(threadName, url)
    ); // FIXME 没有地方释放connectionExecutor!
    queuewarninglimit = url.getParameter(Constants.CONNECT_QUEUE_WARNING_SIZE, Constants.DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
    }

    public void connected(Channel channel) throws RemotingException {
    try{
    checkQueueLength();
    connectionExecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CONNECTED));
    }catch (Throwable t) {
    throw new ExecutionException("connect event", channel, getClass()+" error when process connected event ." , t);
    }
    }

    public void disconnected(Channel channel) throws RemotingException {
    try{
    checkQueueLength();
    connectionExecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.DISCONNECTED));
    }catch (Throwable t) {
    throw new ExecutionException("disconnected event", channel, getClass()+" error when process disconnected event ." , t);
    }
    }

    public void received(Channel channel, Object message) throws RemotingException {
    ExecutorService cexecutor = executor;
    if (cexecutor == null || cexecutor.isShutdown()) {
    cexecutor = SHARED_EXECUTOR;
    }
    try {
    cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
    } catch (Throwable t) {
    throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
    }
    }

    public void caught(Channel channel, Throwable exception) throws RemotingException {
    ExecutorService cexecutor = executor;
    if (cexecutor == null || cexecutor.isShutdown()) {
    cexecutor = SHARED_EXECUTOR;
    }
    try{
    cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CAUGHT, exception));
    }catch (Throwable t) {
    throw new ExecutionException("caught event", channel, getClass()+" error when process caught event ." , t);
    }
    }

    private void checkQueueLength(){
    if (connectionExecutor.getQueue().size() > queuewarninglimit){
    logger.warn(new IllegalThreadStateException("connectionordered channel handler `queue size: "+connectionExecutor.getQueue().size()+" exceed the warning limit number :"+queuewarninglimit));
    }
    }

ThreadPool

  • fixed 固定大小线程池,启动时建立线程,不关闭,一直持有。(缺省)
  • cached 缓存线程池,空闲一分钟自动删除,需要时重建。
  • limited 可伸缩线程池,但池中的线程数只会增长不会收缩。只增长不收缩的目的是为了避免收缩时突然来了大流量引起的性能问题。

DubboRegistryFailedRetryTimer

注册中心相关 失败执行后 - 定时循环重新执行任务 - 执行器

参数retry.period : 注册中心失败事件重试事件,即此线程的定时数,单位默认MILLISECONDS

FailbackRegistry 作为ZookeeperRegistry的父类:专注用于失败后的重试机制

  • doRegister : 注册中心失败

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    //failedRegistered 数值的由来
    //failedRegistered.remove(url);
    //failedUnregistered.remove(url);
    try {
    // 向服务器端发送注册请求
    doRegister(url);
    } catch (Exception e) {
    // 将失败的注册请求记录到失败列表,定时重试
    failedRegistered.add(url);
    }


    //只要成功了就移除failedRegistered.remove(url);
    protected void doRegister(URL url) {
    try {
    zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
    } catch (Throwable e) {
    throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
    }
    }
  • doSubscribe : 监听注册中心失败重试

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    failedSubscribed

    //removeFailedSubscribed(url, listener);
    try {
    // 向服务器端发送订阅请求
    doSubscribe(url, listener);
    } catch (Exception e) {
    // 将失败的订阅请求记录到失败列表,定时重试
    addFailedSubscribed(url, listener);
    }
  • doUnsubscribe : 向服务器端发送取消订阅请求失败重试

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    removeFailedSubscribed(url, listener);
    try {
    // 向服务器端发送取消订阅请求
    doUnsubscribe(url, listener);
    } catch (Exception e) {
    // 将失败的取消订阅请求记录到失败列表,定时重试
    Set<NotifyListener> listeners = failedUnsubscribed.get(url);
    if (listeners == null) {
    failedUnsubscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
    listeners = failedUnsubscribed.get(url);
    }
    listeners.add(listener);
    }
  • doNotify : 将失败的通知请求记录到失败列表,定时重试

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
     try {
    doNotify(url, listener, urls);
    } catch (Exception t) {
    // 将失败的通知请求记录到失败列表,定时重试
    Map<NotifyListener, List<URL>> listeners = failedNotified.get(url);
    if (listeners == null) {
    failedNotified.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, List<URL>>());
    listeners = failedNotified.get(url);
    }
    listeners.put(listener, urls);
    logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
    }

forking-cluster-timer

Cluster选用Forking Cluster
并行调用多个服务器,只要一个成功即返回

failback-cluster-timer

Cluster选用Failback Cluster
failback-cluster-timer线程池用于定时重发失败的调用

DubboClientHandler

NettyClient产生线程池
ChannelHandlers#wrapNettyClient相关处理业务的handler包裹在Dispatcher

1
<dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="100" />

dubbo-remoting-client-heartbeat

HeaderExchangeClient产生此线程池,执行HeartBeatTask

heartbeat:性能调优心跳间隔,对于长连接,当物理层断开时;比如拔网线,TCP的FIN消息来不及发送,对方收不到断开事件,此时需要心跳来帮助检查连接是否已断开

DubboClientReconnectTimer

Dubbo客户端断线重连
心跳和断线重连线程池工作职责不一样

  • 心跳 : 只管发send信息到各个channelProvider,如果遇到超时则进行重连,期间正常发的时候不去管是否有断开连接
  • 断线重连 : 定时判断是否连接,如遇没有连接则进行重连
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Runnable connectStatusCheckCommand =  new Runnable() {
public void run() {
try {
if (! isConnected()) {
connect();
} else {
lastConnectedTime = System.currentTimeMillis();
}
} catch (Throwable t) {
String errorMsg = "client reconnect to "+getUrl().getAddress()+" find error . url: "+ getUrl();
// wait registry sync provider list
if (System.currentTimeMillis() - lastConnectedTime > shutdown_timeout){
if (!reconnect_error_log_flag.get()){
reconnect_error_log_flag.set(true);
logger.error(errorMsg, t);
return ;
}
}
if ( reconnect_count.getAndIncrement() % reconnect_warning_period == 0){
logger.warn(errorMsg, t);
}
}
}
};
reconnectExecutorFuture = reconnectExecutorService.scheduleWithFixedDelay(connectStatusCheckCommand, reconnect, reconnect, TimeUnit.MILLISECONDS);

DubboSaveRegistryCache

AbstractRegistry产生,主要用于配置文件缓存定时写入
每次注册中心notify的时候会激发saveProperties(url);->registryCacheExecutor.execute(new SaveProperties(version));函数
用于写入invoker的时候写入本地缓存

1
registryCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet()));

DubboServerHandler

NettyServer产生线程池
ChannelHandlers#wrapNettyClient相关处理业务的handler包裹在Dispatcher

1
<dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="100" />

dubbo-remoting-server-heartbeat

HeaderExchangeServer产生线程池,执行HeartBeatTask

handler包裹

ExchangerChannelHandler与ExchangeChannelHandler不一样
一个用来处理响应并回复,一个用来作为NettyClient包裹发送

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
MultiMessageHandler(new HeartbeatHandler(
ExtensionLoader.getExtensionLoader(Dispatcher.class).getAdaptiveExtension().dispatch(handler, url))
)

响应:
MultiMessageHandler------>HeartbeatHandler------>Dispatcher线程池------>
DecodeHandler------>HeaderExchangerHandler------>DubboProtocol#requestHandler
------>DubboExporter------>proxyFactory#invoker

发送:
HeaderExchangeHandler#received
Response response = handleRequest(exchangeChannel, request);//给DubboProtocol#requestHandler执行
channel.send(response);//再发出去

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
整体的结构:
MultiMessageHandler------>HeartbeatHandler------>Dispatcher线程池------>
DecodeHandler------>HeaderExchangerHandler------>DubboProtocol#requestHandler


发送:
HeaderExchangeClient内部把NettyTransport返回的NettyClient作为client后
this.channel = new HeaderExchangeChannel(client);
还包了一层HeaderExchangeChannel与所以request方法实际简介运营nettyClient如下:

public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion("2.0.0");
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = new DefaultFuture(channel, req, timeout);
try{
channel.send(req);
}catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}




响应:
HeaderExchangerHandler会拦截
if (message instanceof Response) {
handleResponse(channel, (Response) message);
}

static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}