SpringCloud技术栈-相关的源码详解笔记
Auto Configuration @Enable* 注解 @SpringBootApplication
注解带入了@EnableAutoConfiguration
@EnableAutoConfiguration
又带有@Import(EnableAutoConfigurationImportSelector.class)
使用了Spring Core包的SpringFactoriesLoader类的loadFactoryNamesof()方法。 SpringFactoriesLoader会查询META-INF/spring.factories文件中包含的JAR文件。
DeferredImportSelector 接口 1 2 3 4 5 6 7 8 sequenceDiagram SpringApplication->>SpringApplication:run(args); SpringApplication->>SpringApplication:refreshContext SpringApplication->>AbstractApplicationContext:refresh AbstractApplicationContext->>AbstractApplicationContext:invokeBeanFactoryPostProcessors AbstractApplicationContext->>PostProcessorRegistrationDelegate:invokeBeanFactoryPostProcessors PostProcessorRegistrationDelegate->>postProcessor:postProcessBeanDefinitionRegistry
1 2 3 4 5 6 sequenceDiagram ConfigurationClassPostProcessor->>ConfigurationClassPostProcessor:processConfigBeanDefinitions; ConfigurationClassPostProcessor->>ConfigurationClassParser:parse ConfigurationClassParser->>ConfigurationClassParser:processDeferredImportSelectors ConfigurationClassParser->>DeferredImportSelector:selectImports
以上逻辑就实现了DeferredImportSelector,才得以重写的selectImports起生效
getCandidateConfigurations利用了SpringFactoriesLoader.loadFactoryNames寻找相关的”META-INF/spring.factories”
属性映射 1 2 3 4 5 6 7 8 9 10 @ConfigurationProperties (prefix = "spring.data.mongodb" )public class MongoProperties { private String host; private int port = DBPort.PORT; private String uri = "mongodb://localhost/test" ; private String database; }
@ConfigurationProperties注释将POJO关联到指定前缀的每一个属性。 例如:spring.data.mongodb.port属性将映射到这个类的端口属性。
@Conditional 注解 以下各个注解有很大帮助实现自动配置功能
@ConditionalOnBean
@ConditionalOnClass
@ConditionalOnExpression
@ConditionalOnMissingBean
@ConditionalOnMissingClass
@ConditionalOnNotWebApplication
@ConditionalOnResource
@ConditionalOnWebApplication
Eureka
概述
集群重要类:PeerAwareInstanceRegistryImpl
新的Eureka Server节点加入集群后的影响
新服务注册(Register)注册时的影响
服务心跳(renew)
服务下线和剔除
自我保护模式
Eureka Server的集群同步操作 Eureka官网的架构图,下方的操作需要结合下图理解:
PeerAwareInstanceRegistryImpl 集群相关重要的类com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl
:为了保证集群里所有Eureka Server节点的状态同步 所有以下操作都会同步到集群的所有服务上:服务注册(Registers)、服务更新(Renewals)、服务取消(Cancels),服务超时(Expirations)和服务状态变更(Status Changes)。
以下是一些部分方法:
syncUp:在Eureka Server重启或新的Eureka Server节点加进来的,会执行初始化,从集群其他节点中获取所有的实例注册信息,从而能够正常提供服务。当Eureka - Server启动时,它会从其它节点获取所有的注册信息,如果获取同步失败,它在一定时间(此值由决定)内拒绝服务。
replicateToPeers: 同步以下操作到所有的集群节点:服务注册(Registers)、服务更新(Renewals)、服务取消(Cancels),服务超时(Expirations)和服务状态变更(Status Changes)
register:注册实例,并且复印此实例的信息到所有的eureka server的节点。如果其它Eureka Server调用此节点,只在本节点更新实例信息,避免通知其他节点执行更新
renew:心跳,同步集群
cancel
其他
Eureka Server集群之间的状态是采用异步方式同步的,所以不保证节点间的状态一定是一致的,不过基本能保证最终状态是一致的。
新的Eureka Server节点加入集群后的影响 当有新的节点加入到集群中,会对现在Eureka Server和Eureka Client有什么影响以及他们如何发现新增的Eureka Server节点:
新服务注册(Register)注册时的影响 Service Provider要对外提供服务,把自己注册到Eureka Server上。如果配置参数eureka.client.registerWithEureka=true(默认值true)时,会向Eureka Server注册进行注册,Eureka Server会保存注册信息到内存中。
Service Consumer为了避免每次调用服务请求都需要向Eureka Server获取服务实例的注册信息,此时需要设置eureka.client.fetchRegistry=true,它会在本地缓存所有实例注册信息。为了保证缓存数据的有效性,它会定时(值由eureka.client.registry-fetch-interval-seconds定义,默认值为30s)向注册中心更新实例。
服务心跳(renew) 服务实例会通过心跳(eureka.instance.lease-renewal-interval-in-seconds定义心跳的频率,默认值为30s)续约的方式向Eureka Server定时更新自己的状态。Eureka Server收到心跳后,会通知集群里的其它Eureka Server更新此实例的状态。Service Provider/Service Consumer也会定时更新缓存的实例信息。
服务下线和剔除 服务的下线有两种情况:
在Service Provider服务shutdown的时候,主动通知Eureka Server把自己剔除,从而避免客户端调用已经下线的服务。
Eureka Server会定时(间隔值是eureka.server.eviction-interval-timer-in-ms,默认值为0,默认情况不删除实例)进行检查,如果发现实例在在一定时间(此值由eureka.instance.lease-expiration-duration-in-seconds定义,默认值为90s)内没有收到心跳,则会注销此实例。
这种情况下,Eureka Client的最多需要[eureka.instance.lease-renewal-interval-in-seconds + eureka.client.registry-fetch-interval-seconds]时间才发现服务已经下线。同理,一个新的服务上线后,Eureka Client的服务消费方最多需要相同的时间才发现服务已经上线
服务下线,同时会更新到Eureka Server其他节点和Eureka client的缓存,流程类似同以上的register过程
自我保护模式 如果Eureka Server最近1分钟收到renew的次数小于阈值(即预期的最小值),则会触发自我保护模式,此时Eureka Server此时会认为这是网络问题,它不会注销任何过期的实例。等到最近收到renew的次数大于阈值后,则Eureka Server退出自我保护模式。
自我保护模式阈值计算:
每个instance的预期心跳数目 = 60/每个instance的心跳间隔秒数
阈值 = 所有注册到服务的instance的数量的预期心跳之和 *自我保护系数
以上的参数都可配置的:
instance的心跳间隔秒数:eureka.instance.lease-renewal-interval-in-seconds
自我保护系数:eureka.server.renewal-percent-threshold
如果我们的实例比较少且是内部网络时,推荐关掉此选项。我们也可以通过eureka.server.enable-self-preservation = false来禁用自我保护系数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 server: port: 10761 spring: application: name: cloud-registration-center eureka: instance: preferIpAddress: true lease-renewal-interval-in-seconds: 3 lease-expiration-duration-in-seconds: 7 client: registerWithEureka: true fetchRegistry: false eureka-service-url-poll-interval-seconds: 60 registry-fetch-interval-seconds: 5 serviceUrl: defaultZone: http://127.0.0.1:10761/eureka/ server: enable-self-preservation: false eviction-interval-timer-in-ms: 3000 wait-time-in-ms-when-sync-empty: 6000000 peer-eureka-nodes-update-interval-ms: 60000
Eureka-Client EurekaClientAutoConfiguration
被spring.factories
纳入,自动被加载实例beanEurekaClientAutoConfiguration
里面有eurekaClient
存在
1 2 3 4 5 sequenceDiagram EmbeddedWebApplicationContext->>AbstractApplicationContext:super.refresh(); AbstractApplicationContext->>AbstractApplicationContext:finishBeanFactoryInitialization(); AbstractApplicationContext->>AbstractApplicationContext:beanFactory.preInstantiateSingletons();
在eurekaClient
的实例化Bean中1 new CloudEurekaClient(manager, config, this.optionalArgs,this.context);
而CloudEurekaClient
又继承于DiscoveryClient
至此会实例化相关的DiscoveryClient
CloudEurekaClient继承与DiscoveryClient
创建 EurekaInstanceConfig对象
使用 EurekaInstanceConfig对象 创建 InstanceInfo对象
使用 EurekaInstanceConfig对象 + InstanceInfo对象 创建
ApplicationInfoManager对象
创建 EurekaClientConfig对象
使用 ApplicationInfoManager对象 + EurekaClientConfig对象 创建 EurekaClient对象
其中ApplicationInfoManager包括
EurekaInstanceConfig
InstanceInfo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public class ApplicationInfoManager { private static ApplicationInfoManager instance = new ApplicationInfoManager(null , null , null ); protected final Map<String, StatusChangeListener> listeners; private final InstanceStatusMapper instanceStatusMapper; private InstanceInfo instanceInfo; private EurekaInstanceConfig config; public ApplicationInfoManager (EurekaInstanceConfig config, InstanceInfo instanceInfo, OptionalArgs optionalArgs) { this .config = config; this .instanceInfo = instanceInfo; this .listeners = new ConcurrentHashMap<String, StatusChangeListener>(); if (optionalArgs != null ) { this .instanceStatusMapper = optionalArgs.getInstanceStatusMapper(); } else { this .instanceStatusMapper = NO_OP_MAPPER; } instance = this ; } }
EurekaInstanceConfig,重在应用实例,例如,应用名、应用的端口等等。此处应用指的是,Application Consumer 和 Application Provider。
EurekaClientConfig,重在 Eureka-Client,例如, 连接的 Eureka-Server 的地址、获取服务提供者列表的频率、注册自身为服务提供者的频率等等。
初始化线程池 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 private final ScheduledExecutorService scheduler;private final ThreadPoolExecutor heartbeatExecutor;private final ThreadPoolExecutor cacheRefreshExecutor;scheduler = Executors.newScheduledThreadPool(2 , new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d" ) .setDaemon(true ) .build()); heartbeatExecutor = new ThreadPoolExecutor( 1 , clientConfig.getHeartbeatExecutorThreadPoolSize(), 0 , TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d" ) .setDaemon(true ) .build() ); cacheRefreshExecutor = new ThreadPoolExecutor( 1 , clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0 , TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d" ) .setDaemon(true ) .build() );
scheduler:定时任务线程池,初始化大小为 2,定时给任务,一个给 heartbeatExecutor,一个给 cacheRefreshExecutor
heartbeatExecutor,cacheRefreshExecutor:在提交给scheduler 才声明具体的任务。
初始化定时任务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 initScheduledTasks(); private void initScheduledTasks () { if (clientConfig.shouldFetchRegistry()) { int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); scheduler.schedule( new TimedSupervisorTask( "cacheRefresh" , scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ), registryFetchIntervalSeconds, TimeUnit.SECONDS); } if (clientConfig.shouldRegisterWithEureka()) { int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs); scheduler.schedule( new TimedSupervisorTask( "heartbeat" , scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS); instanceInfoReplicator = new InstanceInfoReplicator( this , instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2 ); statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId () { return "statusChangeListener" ; } @Override public void notify (StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { logger.warn("Saw local status change event {}" , statusChangeEvent); } else { logger.info("Saw local status change event {}" , statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } }; if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration" ); } }
定时提交任务到Heartbeat线程池,另一个任务到cacheRefresh线程池
初始化 Eureka 网络通信相关 1 2 3 eurekaTransport = new EurekaTransport(); scheduleServerEndpointTask(eurekaTransport, args);
Eureka Server Spring Cloud Eureka Server1 2 org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration
自动摄取了EurekaServerAutoConfiguration
配置类 里面包含了EurekaServerBootstrap
这个Bean
EurekaServerBootstrap 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class EurekaBootStrap implements ServletContextListener { @Override public void contextInitialized (ServletContextEvent event) { try { initEurekaEnvironment(); initEurekaServerContext(); ServletContext sc = event.getServletContext(); sc.setAttribute(EurekaServerContext.class.getName(), serverContext); } catch (Throwable e) { logger.error("Cannot bootstrap eureka server :" , e); throw new RuntimeException("Cannot bootstrap eureka server :" , e); } } }
Eureka-Server 配置环境 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 private static final String TEST = "test" ;private static final String ARCHAIUS_DEPLOYMENT_ENVIRONMENT = "archaius.deployment.environment" ;private static final String EUREKA_ENVIRONMENT = "eureka.environment" ;private static final String CLOUD = "cloud" ;private static final String DEFAULT = "default" ;private static final String ARCHAIUS_DEPLOYMENT_DATACENTER = "archaius.deployment.datacenter" ;private static final String EUREKA_DATACENTER = "eureka.datacenter" ;protected void initEurekaEnvironment () throws Exception { logger.info("Setting the eureka configuration.." ); String dataCenter = ConfigurationManager.getConfigInstance().getString(EUREKA_DATACENTER); if (dataCenter == null ) { logger.info("Eureka data center value eureka.datacenter is not set, defaulting to default" ); ConfigurationManager.getConfigInstance().setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, DEFAULT); } else { ConfigurationManager.getConfigInstance().setProperty(ARCHAIUS_DEPLOYMENT_DATACENTER, dataCenter); } String environment = ConfigurationManager.getConfigInstance().getString(EUREKA_ENVIRONMENT); if (environment == null ) { ConfigurationManager.getConfigInstance().setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, TEST); logger.info("Eureka environment value eureka.environment is not set, defaulting to test" ); } }
EurekaBootStrap Eureka Client Eureka-Server 内嵌 Eureka-Client,用于和 Eureka-Server 集群里其他节点通信交互。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 ApplicationInfoManager applicationInfoManager; if (eurekaClient == null ) { EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext()) ? new CloudInstanceConfig() : new MyDataCenterInstanceConfig(); applicationInfoManager = new ApplicationInfoManager( instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get()); EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig(); eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig); } else { applicationInfoManager = eurekaClient.getApplicationInfoManager(); }
注册
Eureka-Client 发起注册 | 应用实例信息复制器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 public class DiscoveryClient implements EurekaClient { private ApplicationInfoManager.StatusChangeListener statusChangeListener; private InstanceInfoReplicator instanceInfoReplicator; private void initScheduledTasks () { if (clientConfig.shouldRegisterWithEureka()) { instanceInfoReplicator = new InstanceInfoReplicator( this , instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2 ); statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { }; if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } } }
刷新应用实例信息 InstanceInfoReplicator#run
函数里面调用
DiscoveryClient#refreshInstanceInfo()
方法,刷新应用实例信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 void refreshInstanceInfo () { applicationInfoManager.refreshDataCenterInfoIfRequired(); applicationInfoManager.refreshLeaseInfoIfRequired(); InstanceStatus status; try { status = getHealthCheckHandler().getStatus(instanceInfo.getStatus()); } catch (Exception e) { logger.warn("Exception from healthcheckHandler.getStatus, setting status to DOWN" , e); status = InstanceStatus.DOWN; } if (null != status) { applicationInfoManager.setInstanceStatus(status); } }
Eureka-Server 接收注册请求 注册应用实例信息的请求,映射 ApplicationResource#addInstance()
方法1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 @Produces ({"application/xml" , "application/json" })public class ApplicationResource { @POST @Consumes ({"application/json" , "application/xml" }) public Response addInstance (InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { logger.debug("Registering instance {} (replication={})" , info.getId(), isReplication); if (isBlank(info.getId())) { return Response.status(400 ).entity("Missing instanceId" ).build(); } else if (isBlank(info.getHostName())) { return Response.status(400 ).entity("Missing hostname" ).build(); } else if (isBlank(info.getIPAddr())) { return Response.status(400 ).entity("Missing ip address" ).build(); } else if (isBlank(info.getAppName())) { return Response.status(400 ).entity("Missing appName" ).build(); } else if (!appName.equals(info.getAppName())) { return Response.status(400 ).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build(); } else if (info.getDataCenterInfo() == null ) { return Response.status(400 ).entity("Missing dataCenterInfo" ).build(); } else if (info.getDataCenterInfo().getName() == null ) { return Response.status(400 ).entity("Missing dataCenterInfo Name" ).build(); } DataCenterInfo dataCenterInfo = info.getDataCenterInfo(); if (dataCenterInfo instanceof UniqueIdentifier) { String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId(); if (isBlank(dataCenterInfoId)) { boolean experimental = "true" .equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId" )); if (experimental) { String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id" ; return Response.status(400 ).entity(entity).build(); } else if (dataCenterInfo instanceof AmazonInfo) { AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo; String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId); if (effectiveId == null ) { amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId()); } } else { logger.warn("Registering DataCenterInfo of type {} without an appropriate id" , dataCenterInfo.getClass()); } } } registry.register(info, "true" .equals(isReplication)); return Response.status(204 ).build(); } }
调用 AbstractInstanceRegistry#register(...)
方法,注册应用实例信息,实现代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 public void register (InstanceInfo registrant, int leaseDuration, boolean isReplication) { try { read.lock(); Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName()); REGISTER.increment(isReplication); if (gMap == null ) { final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>(); gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); if (gMap == null ) { gMap = gNewMap; } } Lease<InstanceInfo> existingLease = gMap.get(registrant.getId()); if (existingLease != null && (existingLease.getHolder() != null )) { Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); logger.debug("Existing lease found (existing={}, provided={}" , existingLastDirtyTimestamp, registrationLastDirtyTimestamp); if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" + " than the one that is being registered {}" , existingLastDirtyTimestamp, registrationLastDirtyTimestamp); logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant" ); registrant = existingLease.getHolder(); } } else { synchronized (lock) { if (this .expectedNumberOfRenewsPerMin > 0 ) { this .expectedNumberOfRenewsPerMin = this .expectedNumberOfRenewsPerMin + 2 ; this .numberOfRenewsPerMinThreshold = (int ) (this .expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold()); } } logger.debug("No previous lease information found; it is new registration" ); } Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration); if (existingLease != null ) { lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); } gMap.put(registrant.getId(), lease); synchronized (recentRegisteredQueue) { recentRegisteredQueue.add(new Pair<Long, String>( System.currentTimeMillis(), registrant.getAppName() + "(" + registrant.getId() + ")" )); } if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the " + "overrides" , registrant.getOverriddenStatus(), registrant.getId()); if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) { logger.info("Not found overridden id {} and hence adding it" , registrant.getId()); overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); } } InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId()); if (overriddenStatusFromMap != null ) { logger.info("Storing overridden status {} from map" , overriddenStatusFromMap); registrant.setOverriddenStatus(overriddenStatusFromMap); } InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication); registrant.setStatusWithoutDirty(overriddenInstanceStatus); if (InstanceStatus.UP.equals(registrant.getStatus())) { lease.serviceUp(); } registrant.setActionType(ActionType.ADDED); recentlyChangedQueue.add(new RecentlyChangedItem(lease)); registrant.setLastUpdatedTimestamp(); invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); logger.info("Registered instance {}/{} with status {} (replication={})" , registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication); } finally { read.unlock(); } }
续租
Eureka-Client
Eureka-Client 向 Eureka-Server 发起注册应用实例成功后获得租约 ( Lease )。 Eureka-Client 固定间隔向 Eureka-Server 发起续租( renew ),避免租约过期。 默认情况下,租约有效期为 90 秒,续租频率为 30 秒。两者比例为 1 : 3 ,保证在网络异常等情况下,有三次重试的机会。
初始化定时任务 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 private void initScheduledTasks () { if (clientConfig.shouldRegisterWithEureka()) { int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs); scheduler.schedule( new TimedSupervisorTask( "heartbeat" , scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS); } } private volatile long lastSuccessfulHeartbeatTimestamp = -1 ;private class HeartbeatThread implements Runnable { public void run () { if (renew()) { lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); } } } boolean renew () { EurekaHttpResponse<InstanceInfo> httpResponse; try { httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null ); logger.debug("{} - Heartbeat status: {}" , PREFIX + appPathIdentifier, httpResponse.getStatusCode()); if (httpResponse.getStatusCode() == 404 ) { REREGISTER_COUNTER.increment(); logger.info("{} - Re-registering apps/{}" , PREFIX + appPathIdentifier, instanceInfo.getAppName()); long timestamp = instanceInfo.setIsDirtyWithTime(); boolean success = register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } return success; } return httpResponse.getStatusCode() == 200 ; } catch (Throwable e) { logger.error("{} - was unable to send heartbeat!" , PREFIX + appPathIdentifier, e); return false ; } }
Eureka-Server 接收续租请求 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 @PUT public Response renewLease ( @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication, @QueryParam ("overriddenstatus" ) String overriddenStatus, @QueryParam ("status" ) String status, @QueryParam ("lastDirtyTimestamp" ) String lastDirtyTimestamp) { boolean isFromReplicaNode = "true" .equals(isReplication); boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode); return response; } public boolean renew (final String appName, final String id, final boolean isReplication) { if (super .renew(appName, id, isReplication)) { replicateToPeers(Action.Heartbeat, appName, id, null , null , isReplication); return true ; } return false ; } public boolean renew (String appName, String id, boolean isReplication) { RENEW.increment(isReplication); Map<String, Lease<InstanceInfo>> gMap = registry.get(appName); Lease<InstanceInfo> leaseToRenew = null ; if (gMap != null ) { leaseToRenew = gMap.get(id); } if (leaseToRenew == null ) { RENEW_NOT_FOUND.increment(isReplication); logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}" , appName, id); return false ; } else { InstanceInfo instanceInfo = leaseToRenew.getHolder(); if (instanceInfo != null ) { InstanceStatus overriddenInstanceStatus = this .getOverriddenInstanceStatus( instanceInfo, leaseToRenew, isReplication); if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) { logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}" + "; re-register required" , instanceInfo.getId()); RENEW_NOT_FOUND.increment(isReplication); return false ; } if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) { Object[] args = { instanceInfo.getStatus().name(), instanceInfo.getOverriddenStatus().name(), instanceInfo.getId() }; logger.info( "The instance status {} is different from overridden instance status {} for instance {}. " + "Hence setting the status to overridden status" , args); instanceInfo.setStatus(overriddenInstanceStatus); } } renewsLastMin.increment(); leaseToRenew.renew(); return true ; } }
Feign Feign 涉及到了两个注解,一个是@EnableFeignClients,用来开启 Feign 另一个是@FeignClient,用来标记要用 Feign 来拦截的请求接口。
注解说明 EnableFeignClients 注解 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @Retention (RetentionPolicy.RUNTIME)@Target (ElementType.TYPE)@Documented @Import (FeignClientsRegistrar.class)public @interface EnableFeignClients { String[] value() default {}; String[] basePackages() default {}; Class<?>[] basePackageClasses() default {}; Class<?>[] defaultConfiguration() default {}; Class<?>[] clients() default {}; } @Target (ElementType.TYPE)@Retention (RetentionPolicy.RUNTIME)@Documented public @interface FeignClient { @AliasFor ("name" ) String value () default "" ; @Deprecated String serviceId () default "" ; @AliasFor ("value" ) String name () default "" ; String qualifier () default "" ; String url () default "" ; boolean decode404 () default false ; Class<?>[] configuration() default {}; Class<?> fallback() default void .class; Class<?> fallbackFactory() default void .class; String path () default "" ; boolean primary () default true ; }
使用了@Import(FeignClientsRegistrar.class)
Spring会包含此FeignClientsRegistrar
类信息
因实现了ImportBeanDefinitionRegistrar 所以在后期会执行ImportBeanDefinitionRegistrar#registerBeanDefinitions
Feign 是怎么工作的?
第一是为接口定义的每个接口都生成一个实现方法,结果就是 SynchronousMethodHandler
对象。
第二是为该服务接口生成了动态代理。动态代理的实现是 ReflectiveFeign.FeignInvocationHanlder
,代理被调用的时候,会根据当前调用的方法,转到对应的SynchronousMethodHandler
。
当对接口的实例进行请求时(Autowire 的对象是某个ReflectiveFeign.FeignInvocationHanlder 的实例)
根据方法名进入了某个 SynchronousMethodHandler 对象的 invoke 方法。
SynchronousMethodHandler 其实也并不处理具体的 HTTP 请求,它关心的更多的是请求结果的处理。 HTTP 请求的过程,包括服务发现,都交给了当前 context 注册中的 Client 实现类比如: LoadBalancerFeignClient
。
Retry 的逻辑实际上已经提出来了 但是 fallback 并没有在上面体现,因为我们上面分析动态代理的过程中,用的是 Feign.Builder 而如果有 fallback 的情况下,会使用 HystrixFeign.Builder
这是 Feign.Builder的一个子类。
它在创建动态代理的时候,主要改了一个东西就是 invocationFactory
从默认的 InvocationHandlerFactory.Default
变成了一个内部匿名工厂,这个工厂的create 方法返回的不是 ReflectiveFeign.FeignInvocationHandler
,而是HystrixInvocationHandler
。
所以动态代理类换掉了,invoke 的逻辑就变了,每次Invoke都会带HystrixCommand。HystrixCommand
由SynchronousMethodHandler
和fallback
一起封装
1 HystrixInvocationHandler.this .dispatch.get(method).invoke(args);
在新的逻辑里简单的将方法转到对应的SynchronousMethodHandler
上面并且执行该对象。
工作流程 Spring 通过调用其 FeignClientsRegistrar#registerBeanDefinitions
方法来获取其提供的 bean definition。
这里会往 Registry 里面添加两种 BeanDefinition
,
一个是 FeignClientSpecification
主要可调整项是通过 EnableFeignClients 注解的 defaultConfiguration 参数传入。
1 registerClientConfiguration(registry, name,defaultAttrs.get("defaultConfiguration" ));
一个是负责注册 FeignClient
,分为以下几步
找到 basePackage 下面所有包含了 FeignClient 注解的类
读取类上面的 FeignClient 注解参数
如果该注解包括了 configuration 参数则先注册 configuration 所指定的类。 这个类也是包装在 FeignClientSpecification 里面的,在 FeignClient 上指定的 configuration 类是它的一个属性。
1 registerClientConfiguration(registry, name,attributes.get("configuration" ));
注册该注解了 FeignClient 的接口,生成 BeanDefinition 时是以 FeignClientFactoryBean 作为对象创建的,而使用了 FeignClient 注解的接口是作为该 Bean 的一个属性,同时,对于 FeignClient 注解配置的参数,比如 fallback 等都一并作为参数放入 BeanDefinition 中。
总结一下,这些 BeanDefinition 分为两类:
FeignClientFactoryBean,它包含了所有使用了 FeignClient 注解的接口信息以及注解上面的参数。
1 2 它的名字为注解的 URL 或者 value,比如 TEST-SERVICE 跟它上面的 configuration 创建出来的 bean 定义是同一个名字。
FeignAutoConfiguration
给当前环境装配Targeter
以及Client
1 2 @Autowired (required = false ) private List<FeignClientSpecification> configurations = new ArrayList<>();
后期各种AutoConfiguration
来实现自动装配
1 2 例如:FeignRibbonClientAutoConfiguration 装配Client为LoadBalancerFeignClient
FeignClientFactoryBean 它是一个工厂类,Spring Context 创建 Bean 实例时会调用它的 getObject 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 public Object getObject () throws Exception { FeignContext context = applicationContext.getBean(FeignContext.class); Feign.Builder builder = feign(context); if (!StringUtils.hasText(this .url)) { String url; if (!this .name.startsWith("http" )) { url = "http://" + this .name; } else { url = this .name; } url += cleanPath(); return loadBalance(builder, context, new HardCodedTarget<>(this .type, this .name, url)); } if (StringUtils.hasText(this .url) && !this .url.startsWith("http" )) { this .url = "http://" + this .url; } String url = this .url + cleanPath(); Client client = getOptional(context, Client.class); if (client != null ) { if (client instanceof LoadBalancerFeignClient) { client = ((LoadBalancerFeignClient)client).getDelegate(); } builder.client(client); } Targeter targeter = get(context, Targeter.class); return targeter.target(this , builder, context, new HardCodedTarget<>( this .type, this .name, url)); }
Configuration 与 FeignBuilder 关联 1 2 FeignContext context = applicationContext.getBean(FeignContext.class); Feign.Builder builder = feign(context);
spring-cloud 也提供了一个 Feign 的初始化配置类:FeignAutoConfiguration。 它初始化了一个新的 FeignContext bean,并且把所有的 configuration 都放在 FeignContext 里面。
FeignContext 继承了 NamedContextFactory,它会管理一批 Context 外部调用的时候会指定用哪个 context 来寻找对应的 bean 而 context 如果不存在,则会创建一个新的 AnnotationConfigApplicationContext。 创建 context 的时候,会用 name 去匹配已有的 configurations(加载该 FeignClient 注解里面提供的 configuration 属性类) 如果有同名的,则就将该 configuration 注册进 context 另外如果有 default 开头的 configuration,也会将其注册到 context 里面 最后调用 refresh 方法对 context 进行初始化。 初始化以后,相当于 configuration 里面提供的encoder,decoder 这些就逗号了。
在 FeignClientFactoryBean 创建 Bean 的时候它先从 applicationContext 里面找到已经构建好的 feignContext
初始化一个 FeignBuilder,FeignBuilder 会利用 context 里面包含的对应的 configuration 指定的 bean 获取指定的类,比如 decoder,encoder,retryer,errorDecoder。 然后再去寻找 context 中的 Client bean。
Client Bean 1 Client client = getOptional(context, Client.class);
如果当前路径里面有Ribbon
,那么 Spring Boot 启动时就会创建一个LoadBalancerFeignClient
。 如果没有,FeignAutoConfiguration
里面也会自己去创建 ApacheHttpClient
或者 OKHttpClient
。 FeignBuilder 会拿这个 client
配到自己里面。
Ribbon的负载均衡
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 LoadBalancerFeignClient 做为FeignRibbonClientAutoConfiguration#feignClient引入 LoadBalancerFeignClient#lbClient利用内部成员变量CachingSpringLoadBalancerFactory创建合适的Client 主要看是否有重试?RetryableFeignLoadBalancer:FeignLoadBalancer,这两个类都继承AbstractLoadBalancerAwareClient executeWithLoadBalancer就是执行父抽象类AbstractLoadBalancerAwareClient#executeWithLoadBalancer 由LoadBalancerCommand以RxJava方式去submit一个请求执行 LoadBalancerCommand#selectServer选择服务器间接引出LoadBalancerContext再引出ILoadBalancer 所以具体交给了ILoadBalancer来处理,ILoadBalancer通过配置IRule、IPing等信息 由BaseLoadBalancer引出了IRule.choose(key);默认为RoundRobinRule 并向EurekaClient获取注册列表的信息,并默认10秒一次向EurekaClient发送"ping",进而检查是否更新服务列表 最后得到注册列表后,ILoadBalancer根据IRule的策略进行负载均衡。 而RestTemplate被@LoadBalance注解后能过用负载均衡;主要是维护了一个被@LoadBalance注解的RestTemplate列表,并给列表中的RestTemplate添加拦截器,进而交给负载均衡器去处理。
Targeter 生产代理类 1 2 Targeter targeter = get(context, Targeter.class); return targeter.target(this , builder, context, new HardCodedTarget<>(this .type, this .name, url));
然后是最关键的一步,获取 Targeter 来生成动态代理类,在 FeignAutoConfiguration 里面, 指定了生成 HystrixTargeter。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 return loadBalance(builder, context, new HardCodedTarget<>(this .type,this .name, url));protected <T> T loadBalance (Feign.Builder builder, FeignContext context, HardCodedTarget<T> target) { Client client = getOptional(context, Client.class); if (client != null ) { builder.client(client); Targeter targeter = get(context, Targeter.class); return targeter.target(this , builder, context, target); } throw new IllegalStateException( "No Feign Client for loadBalancing defined. Did you forget to include spring-cloud-starter-ribbon?" ); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public <T> T target (FeignClientFactoryBean factory, Feign.Builder feign, FeignContext context, Target.HardCodedTarget<T> target) { if (!(feign instanceof feign.hystrix.HystrixFeign.Builder)) { return feign.target(target); } feign.hystrix.HystrixFeign.Builder builder = (feign.hystrix.HystrixFeign.Builder) feign; SetterFactory setterFactory = getOptional(factory.getName(), context, SetterFactory.class); if (setterFactory != null ) { builder.setterFactory(setterFactory); } Class<?> fallback = factory.getFallback(); if (fallback != void .class) { return targetWithFallback(factory.getName(), context, target, builder, fallback); } Class<?> fallbackFactory = factory.getFallbackFactory(); if (fallbackFactory != void .class) { return targetWithFallbackFactory(factory.getName(), context, target, builder, fallbackFactory); } return feign.target(target); }
如果 FeignClient 没有定义 fallback,或者说 Builder 不是 HystrixFeignBuilder,则直接用 FeignBuidler 的 target 方法生成代理。
1 2 3 4 5 6 7 8 9 10 11 12 13 public <T> T target (Target<T> target) { return build().newInstance(target); } public Feign build () { SynchronousMethodHandler.Factory synchronousMethodHandlerFactory = new SynchronousMethodHandler.Factory(client, retryer, requestInterceptors, logger, logLevel, decode404); ParseHandlersByName handlersByName = new ParseHandlersByName(contract, options, encoder, decoder, errorDecoder, synchronousMethodHandlerFactory); return new ReflectiveFeign(handlersByName, invocationHandlerFactory); }
生成代理 ReflectiveFeign 生成动态代理对象(newInstance)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 public <T> T newInstance (Target<T> target) { Map<String, MethodHandler> nameToHandler = targetToHandlersByName.apply(target); Map<Method, MethodHandler> methodToHandler = new LinkedHashMap<Method, MethodHandler>(); List<DefaultMethodHandler> defaultMethodHandlers = new LinkedList<DefaultMethodHandler>(); for (Method method : target.type().getMethods()) { if (method.getDeclaringClass() == Object.class) { continue ; } else if (Util.isDefault(method)) { DefaultMethodHandler handler = new DefaultMethodHandler(method); defaultMethodHandlers.add(handler); methodToHandler.put(method, handler); } else { methodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method))); } } InvocationHandler handler = factory.create(target, methodToHandler); T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(), new Class<?>[]{target.type()}, handler); for (DefaultMethodHandler defaultMethodHandler : defaultMethodHandlers) { defaultMethodHandler.bindTo(proxy); } return proxy; } public Map<String, MethodHandler> apply (Target key) { List<MethodMetadata> metadata = contract.parseAndValidatateMetadata(key.type()); Map<String, MethodHandler> result = new LinkedHashMap<String, MethodHandler>(); for (MethodMetadata md : metadata) { BuildTemplateByResolvingArgs buildTemplate; if (!md.formParams().isEmpty() && md.template().bodyTemplate() == null ) { buildTemplate = new BuildFormEncodedTemplateFromArgs(md, encoder); } else if (md.bodyIndex() != null ) { buildTemplate = new BuildEncodedTemplateFromArgs(md, encoder); } else { buildTemplate = new BuildTemplateByResolvingArgs(md); } result.put(md.configKey(), factory.create(key, md, buildTemplate, options, decoder, errorDecoder)); } return result; }
synchronousMethodHandlerFactory 的 create 方法。
1 2 3 4 5 6 7 public MethodHandler create (Target<?> target, MethodMetadata md, RequestTemplate.Factory buildTemplateFromArgs, Options options, Decoder decoder, ErrorDecoder errorDecoder) { return new SynchronousMethodHandler(target, client, retryer, requestInterceptors, logger, logLevel, md, buildTemplateFromArgs, options, decoder, errorDecoder, decode404); }
ReflectiveFeign.FeignInvocationHanlder 的 invoke 方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public Object invoke (Object proxy, Method method, Object[] args) throws Throwable { if ("equals" .equals(method.getName())) { try { Object otherHandler = args.length > 0 && args[0 ] != null ? Proxy.getInvocationHandler(args[0 ]) : null ; return equals(otherHandler); } catch (IllegalArgumentException e) { return false ; } } else if ("hashCode" .equals(method.getName())) { return hashCode(); } else if ("toString" .equals(method.getName())) { return toString(); } return dispatch.get(method).invoke(args); }
SynchronousMethodHandler 的 invoke 方法主要是应用 encoder,decoder 以及 retry 等配置并且自身对于调用结果有一定的处理逻辑。
我们最关心的请求实现,实际上是在组装 SynchronousMethodHandler 的 client 参数上 即前面提到的,如果当前路径里面有 Ribbon,就是 LoadBalancerFeignClient 如果没有,根据配置生成 ApacheHttpClient 或者 OKHttpClient。 在 Ribbon 里面,实现了 Eureka 服务发现以及进行请求等动作,当然 Ribbon 里面还带了负载均衡逻辑。
SynchronousMethodHandler#invoke方法 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 public Object invoke (Object[] argv) throws Throwable { RequestTemplate template = buildTemplateFromArgs.create(argv); Retryer retryer = this .retryer.clone(); while (true ) { try { return executeAndDecode(template); } catch (RetryableException e) { retryer.continueOrPropagate(e); if (logLevel != Logger.Level.NONE) { logger.logRetry(metadata.configKey(), logLevel); } continue ; } } } Object executeAndDecode (RequestTemplate template) throws Throwable { Request request = targetRequest(template); if (logLevel != Logger.Level.NONE) { logger.logRequest(metadata.configKey(), logLevel, request); } Response response; long start = System.nanoTime(); try { response = client.execute(request, options); response.toBuilder().request(request).build(); } catch (IOException e) { if (logLevel != Logger.Level.NONE) { logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime(start)); } throw errorExecuting(request, e); } long elapsedTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); boolean shouldClose = true ; try { if (logLevel != Logger.Level.NONE) { response = logger.logAndRebufferResponse(metadata.configKey(), logLevel, response, elapsedTime); response.toBuilder().request(request).build(); } if (Response.class == metadata.returnType()) { if (response.body() == null ) { return response; } if (response.body().length() == null || response.body().length() > MAX_RESPONSE_BUFFER_SIZE) { shouldClose = false ; return response; } byte [] bodyData = Util.toByteArray(response.body().asInputStream()); return response.toBuilder().body(bodyData).build(); } if (response.status() >= 200 && response.status() < 300 ) { if (void .class == metadata.returnType()) { return null ; } else { return decode(response); } } else if (decode404 && response.status() == 404 && void .class != metadata.returnType()) { return decode(response); } else { throw errorDecoder.decode(metadata.configKey(), response); } } catch (IOException e) { if (logLevel != Logger.Level.NONE) { logger.logIOException(metadata.configKey(), logLevel, e, elapsedTime); } throw errorReading(request, response, e); } finally { if (shouldClose) { ensureClosed(response.body()); } }
Feign调用流程 业务FeignClient
的调用->ReflectiveFeign.FeignInvocationHandler#invoker
invoker代码引出如下
1 return dispatch.get(method).invoke(args);
SynchronousMethodHandler#invoke
创建合适的RequestTemplate
执行executeAndDecode
通过targetRequest
将RequestTemplate
转换为request
client.execute(request, options);
(由client引出LoadBalancerFeignClient
)
LoadBalancerFeignClient#execute
根据request
构建ribbonRequest
lbClient(clientName).executeWithLoadBalancer(ribbonRequest,requestConfig).toResponse();
lbClient(clientName)
(通过lbClientFactory工厂创建合适的client去执行负载请求)
获取相关的ILoadBalancer
,IClientConfig
构建FeignLoadBalancer
FeignLoadBalancer#executeWithLoadBalancer
执行点在基础抽象类AbstractLoadBalancerAwareClient#executeWithLoadBalancer
构建命令LoadBalancerCommand
命令执行command.submit
selectServer
选取合适的服务列表
AbstractLoadBalancerAwareClient.this.execute
(引出FeignLoadBalancer#execute
)
FeignLoadBalancer#execute
Client.Default#execute
(HttpURLConnection请求回复)
Ribbon ribbon的6个主要组件:IRule
、IPing
、ServerList
、ServerListFilter
、ServerListUpdater
、ILoadBalancer
功能说明 Ribbon主要包括如下功能
1.支持通过DNS和IP和服务端通信
2.可以根据算法从多个服务中选取一个服务进行访问
3.通过将客户端和服务器分成几个区域(zone)来建立客户端和服务器之间的关系。客户端尽量访问和自己在相同区域(zone)的服务,减少服务的延迟
4.保留服务器的统计信息,ribbon可以实现用于避免高延迟或频繁访问故障的服务器
5.保留区域(zone)的统计数据,ribbon可以实现避免可能访问失效的区域(zone)
Ribbon主要组件 Ribbon主要包含如下组件:
1.IRule
2.IPing
3.ServerList
4.ServerListFilter
5.ServerListUpdater
6.IClientConfig
7.ILoadBalancer
IRule
功能:根据特定算法中从服务列表中选取一个要访问的服务
常用IRule实现有以下几种:
a. 由于多次访问故障而处于断路器跳闸状态 b. 并发的连接数量超过阈值 然后对剩余的服务列表按照RoundRobinRule策略进行访问
WeightedResponseTimeRule 根据平均响应时间计算所有服务的权重,响应时间越快,服务权重越重、被选中的概率越高。刚启动时,如果统计信息不足,则使用RoundRobinRule策略,等统计信息足够,会切换到WeightedResponseTimeRule。
RetryRule 先按照RoundRobinRule的策略获取服务,如果获取服务失败,则在指定时间内会进行重试,获取可用的服务
BestAvailableRule 此负载均衡器会先过滤掉由于多次访问故障而处于断路器跳闸状态的服务,然后选择一个并发量最小的服务
RandomRule 随机获取一个服务
IPing
功能:在后台运行的一个组件,用于检查服务列表是否都活
NIWSDiscoveryPing 不执行真正的ping。如果Discovery Client认为是在线,则程序认为本次心跳成功,服务活着
PingUrl 此组件会使用HttpClient调用服务的一个URL,如果调用成功,则认为本次心跳成功,表示此服务活着。
NoOpPing 永远返回true,即认为服务永远活着
DummyPing 默认实现,默认返回true,即认为服务永远活着
ServerList
功能:存储服务列表。分为静态和动态。如果是动态的后台有个线程会定时刷新和过滤服务列表
常用ServerList 实现有以下几种:
ConfigurationBasedServerList 从配置文件中获取所有服务列表
配置例子:
1 sample-client.ribbon.listOfServers=www.microsoft.com:80,www.yahoo.com:80,www.google.com:80
DiscoveryEnabledNIWSServerList
从Eureka Client中获取服务列表。此值必须通过属性中的VipAddress来标识服务器集群。DynamicServerListLoadBalancer会调用此对象动态获取服务列表
DomainExtractingServerList 代理类,根据ServerList的值实现具体的逻辑
ServerListFilter 该接口允许过滤配置或动态获取的具有所需特性的服务器列表。ServerListFilter是DynamicServerListLoadBalancer用于过滤从ServerList实现返回的服务器的组件。
常用ServerListFilter 实现有以下几种:
ZoneAffinityServerListFilter 过滤掉所有的不和客户端在相同zone的服务,如果和客户端相同的zone不存在,才不过滤不同zone有服务。
启用此配置使用以下配置
1 <clientName>.ribbon.EnableZoneAffinity=true
ZonePreferenceServerListFilter
ZoneAffinityServerListFilter的子类。和ZoneAffinityServerListFilter相似,但是比较的zone是发布环境里面的zone。 过滤掉所有和客户端环境里的配置的zone的不同的服务,如果和客户端相同的zone不存在才不进行过滤。
ZoneAffinityServerListFilter的子类。此过滤器确保客户端仅看到由ServerList实现返回的整个服务器的固定子集。 它还可以定期用新服务器替代可用性差的子集中的服务器。
要启用此过滤器,请指定以下属性:
1 2 3 4 5 6 <clientName>.ribbon.NIWSServerListClassName=com.netflix.niws.loadbalancer.DiscoveryEnabledNIWSServerList # the server must register itself with Eureka server with VipAddress "myservice" <clientName>.ribbon.DeploymentContextBasedVipAddresses=myservice <clientName>.ribbon.NIWSServerListFilterClassName=com.netflix.loadbalancer.ServerListSubsetFilter # only show client 5 servers. default is 20. <clientName>.ribbon.ServerListSubsetFilter.size=5
ServerListUpdater
功能:被DynamicServerListLoadBalancer用于动态的更新服务列表。
常用的实现类:
默认的实现策略。此对象会启动一个定时线程池,定时执行更新策略
EurekaNotificationServerListUpdater
当收到缓存刷新的通知,会更新服务列表。
IClientConfig
功能:定义各种配置信息,用来初始化ribbon客户端和负载均衡器
常用IClientConfig实现有以下几种:
IClientConfig的默认实现,配置文件里的部分值为ribbon。
ILoadBalancer 定义软件负载平衡器操作的接口。动态更新一组服务列表及根据指定算法从现有服务器列表中选择一个服务
DynamicServerListLoadBalancer
DynamicServerListLoadBalancer组合Rule、IPing、ServerList、ServerListFilter、ServerListUpdater 实现类,实现动态更新和过滤更新服务列表
这是DynamicServerListLoadBalancer的子类,主要加入zone的因素。统计每个zone的平均请求的情况,保证从所有zone选取对当前客户端服务最好的服务组列表
默认的ribbon实现类
IClientConfig ribbonClientConfig: DefaultClientConfigImpl
IRule ribbonRule: ZoneAvoidanceRule
IPing ribbonPing: DummyPing
ServerList ribbonServerList: ConfigurationBasedServerList
ServerListFilter ribbonServerListFilter: ZonePreferenceServerListFilter
ILoadBalancer ribbonLoadBalancer: ZoneAwareLoadBalancer
ServerListUpdater ribbonServerListUpdater: PollingServerListUpdater
通过配置文件配置Ribbon的主要组件 1 2 3 4 5 <clientName>.ribbon.NFLoadBalancerClassName=xx <clientName>.ribbon.NFLoadBalancerRuleClassName=xx <clientName>.ribbon.NFLoadBalancerPingClassName=xx <clientName>.ribbon.NIWSServerListClassName=xx <clientName>.ribbon.NIWSServerListFilterClassName=xx
或者
1 2 3 4 5 6 7 8 9 10 11 12 13 @Configuration public class MyDefaultRibbonConfig { @Bean public IPing ribbonPing () { return new MyPingUrl(new NIWSDiscoveryPing()); } }
Hystrix 简介
方法
执行方式
#execute()
同步调用,返回直接结果
#queue()
异步调用,返回 java.util.concurrent.Future
#observe()
异步调用,返回 rx.Observable 。向 Observable 注册 rx.Subscriber 处理结果
#toObservable()
未调用,返回 rx.Observable 。向 Observable 注册 rx.Subscriber 处理结果
#toObservable() 方法 :未做订阅,返回干净的 Observable 。这就是为什么上文说”未调用” 。
#observe() 方法 :调用 #toObservable() 方法的基础上,向 Observable 注册 rx.subjects.ReplaySubject 发起订阅 。 ReplaySubject 会发射所有来自原始 Observable 的数据给观察者,无论它们是何时订阅的。感兴趣的同学可以阅读 《ReactiveX/RxJava文档中文版 —— Subject》 。
#queue() 方法 :调用 #toObservable() 方法的基础上,调用:
Observable#toBlocking() 方法 :将 Observable 转换成阻塞的 rx.observables.BlockingObservable 。
BlockingObservable#toFuture() 方法 :返回可获得 #run() 抽象方法执行结果的 Future 。
#run() 方法 :子类实现该方法,执行正常的业务逻辑。
#execute() 方法 :调用 #queue() 方法的基础上,调用 Future#get() 方法,同步返回 #run() 的执行结果。
应用
线程池隔离 Hystrix通过命令模式,将每个类型的业务请求封装成对应的命令请求
查询订单->订单Command
查询商品->商品Command
查询用户->用户Command。 每个类型的Command对应一个线程池。创建好的线程池是被放入到ConcurrentHashMap中 1 2 final static ConcurrentHashMap<String, HystrixThreadPool> threadPools = new ConcurrentHashMap<String, HystrixThreadPool>();threadPools.put("hystrix-order" , new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public ThreadPoolExecutor getThreadPool (final HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize, HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { final ThreadFactory threadFactory = getThreadFactory(threadPoolKey); final int dynamicCoreSize = corePoolSize.get(); final int dynamicMaximumSize = maximumPoolSize.get(); if (dynamicCoreSize > dynamicMaximumSize) { logger.error("Hystrix ThreadPool configuration at startup for : " + threadPoolKey.name() + " is trying to set coreSize = " + dynamicCoreSize + " and maximumSize = " + dynamicMaximumSize + ". Maximum size will be set to " + dynamicCoreSize + ", the coreSize value, since it must be equal to or greater than the coreSize value" ); return new ThreadPoolExecutor(dynamicCoreSize, dynamicCoreSize, keepAliveTime.get(), unit, workQueue, threadFactory); } else { return new ThreadPoolExecutor(dynamicCoreSize, dynamicMaximumSize, keepAliveTime.get(), unit, workQueue, threadFactory); } }
运用至项目-线程隔离 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public class GetOrderCommand extends HystrixCommand <List > { OrderService orderService; public GetOrderCommand (String name) { super (Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ThreadPoolTestGroup" )) .andCommandKey(HystrixCommandKey.Factory.asKey("testCommandKey" )) .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(name)) .andCommandPropertiesDefaults( HystrixCommandProperties.Setter() .withExecutionTimeoutInMilliseconds(5000 ) ) .andThreadPoolPropertiesDefaults( HystrixThreadPoolProperties.Setter() .withMaxQueueSize(10 ) .withCoreSize(2 ) ) ); } @Override protected List run () throws Exception { return orderService.getOrderList(); } public static class UnitTest { @Test public void testGetOrder () { Future<List> future =new GetOrderCommand("hystrix-order" ).queue(); } } }
线程池隔离 - 优缺点
线程池隔离的优点:
应用程序会被完全保护起来,即使依赖的一个服务的线程池满了,也不会影响到应用程序的其他部分。
我们给应用程序引入一个新的风险较低的客户端lib的时候,如果发生问题,也是在本lib中,并不会影响到其他内容,因此我们可以大胆的引入新lib库。
当依赖的一个失败的服务恢复正常时,应用程序会立即恢复正常的性能。
如果我们的应用程序一些参数配置错误了,线程池的运行状况将会很快显示出来,比如延迟、超时、拒绝等。同时可以通过动态属性实时执行来处理纠正错误的参数配置。
如果服务的性能有变化,从而需要调整,比如增加或者减少超时时间,更改重试次数,就可以通过线程池指标动态属性修改,而且不会影响到其他调用请求。
除了隔离优势外,hystrix拥有专门的线程池可提供内置的并发功能,使得可以在同步调用之上构建异步的外观模式,这样就可以很方便的做异步编程(Hystrix引入了Rxjava异步框架)。
线程池隔离的缺点:
线程池的主要缺点就是它增加了计算的开销,每个业务请求(被包装成命令)在执行的时候,会涉及到请求排队,调度和上下文切换。不过Netflix公司内部认为线程隔离开销足够小,不会产生重大的成本或性能的影响。
信号量隔离 将属性execution.isolation.strategy
设置为SEMAPHORE 象这样 ExecutionIsolationStrategy.SEMAPHORE,则Hystrix使用信号量而不是默认的线程池来做隔离。
线程池方式下业务请求线程和执行依赖的服务的线程不是同一个线程; 信号量方式下业务请求线程和执行依赖服务的线程是同一个线程
运用至项目-信号量隔离 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public class CommandUsingSemaphoreIsolation extends HystrixCommand <String > { private final int id; public CommandUsingSemaphoreIsolation (int id) { super (Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup" )) .andCommandPropertiesDefaults(HystrixCommandProperties.Setter() .withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE))); this .id = id; } @Override protected String run () { return "ValueFromHashMap_" + id; } }
信号量隔离 - 优缺点 信号量隔离的方式是限制了总的并发数,每一次请求过来,请求线程和调用依赖服务的线程是同一个线程,那么如果不涉及远程RPC调用(没有网络开销)则使用信号量来隔离,更为轻量,开销更小。 当我们依赖的服务是极低延迟的,比如访问内存缓存,就没有必要使用线程池的方式,那样的话开销得不偿失
熔断器 Hystrix在运行过程中会向每个commandKey对应的熔断器报告成功、失败、超时和拒绝的状态 熔断器维护计算统计的数据,根据这些统计的信息来确定熔断器是否打开。
如果打开,后续的请求都会被截断。然后会隔一段时间默认是5s,尝试半开,放入一部分流量请求进来,相当于对依赖服务进行一次健康检查 如果恢复,熔断器关闭,随后完全恢复调用。
步骤4开始,Hystrix会检查Circuit Breaker的状态。 如果Circuit Breaker的状态为开启状态,Hystrix将不会执行对应指令,而是直接进入失败处理状态(图中8 Fallback)。 如果Circuit Breaker的状态为关闭状态,Hystrix会继续进行线程池、任务队列、信号量的检查(图中5)
熔断器 - 参数 1: circuitBreaker.enabled
是否启用熔断器,默认是TURE。 2: circuitBreaker.forceOpen
熔断器强制打开,始终保持打开状态。默认值FLASE。 3: circuitBreaker.forceClosed
熔断器强制关闭,始终保持关闭状态。默认值FLASE。 4: circuitBreaker.errorThresholdPercentage
设定错误百分比,默认值50%,例如一段时间(10s)内有100个请求,其中有55个超时或者异常返回了,那么这段时间内的错误百分比是55%,大于了默认值50%,这种情况下触发熔断器-打开。 5: circuitBreaker.requestVolumeThreshold
默认值20.意思是至少有20个请求才进行errorThresholdPercentage错误百分比计算。比如一段时间(10s)内有19个请求全部失败了。 错误百分比是100%,但熔断器不会打开,因为requestVolumeThreshold的值是20. 这个参数非常重要,熔断器是否打开首先要满足这个条件 6: circuitBreaker.sleepWindowInMilliseconds
半开试探休眠时间,默认值5000ms。当熔断器开启一段时间之后比如5000ms,会尝试放过去一部分流量进行试探,确定依赖服务是否恢复。
熔断模拟示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 public class GetOrderCircuitBreakerCommand extends HystrixCommand <String > { public GetOrderCircuitBreakerCommand (String name) { super (Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ThreadPoolTestGroup" )) .andCommandKey(HystrixCommandKey.Factory.asKey("testCommandKey" )) .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey(name)) .andCommandPropertiesDefaults( HystrixCommandProperties.Setter() .withCircuitBreakerEnabled(true ) .withCircuitBreakerForceOpen(false ) .withCircuitBreakerForceClosed(false ) .withCircuitBreakerErrorThresholdPercentage(5 ) .withCircuitBreakerRequestVolumeThreshold(10 ) .withCircuitBreakerSleepWindowInMilliseconds(5000 ) ) .andThreadPoolPropertiesDefaults( HystrixThreadPoolProperties.Setter() .withMaxQueueSize(10 ) .withCoreSize(2 ) ) ); } @Override protected String run () throws Exception { Random rand = new Random(); if (1 ==rand.nextInt(2 )){ throw new Exception("make exception" ); } return "running: " ; } @Override protected String getFallback () { return "fallback: " ; } public static class UnitTest { @Test public void testCircuitBreaker () throws Exception { for (int i=0 ;i<25 ;i++){ Thread.sleep(500 ); HystrixCommand<String> command = new GetOrderCircuitBreakerCommand("testCircuitBreaker" ); String result = command.execute(); System.out.println("call times:" +(i+1 )+" result:" +result +" isCircuitBreakerOpen: " +command.isCircuitBreakerOpen()); } } } }
每个熔断器默认维护10个bucket,每秒一个bucket,每个blucket记录成功,失败,超时,拒绝的状态,默认错误超过50%且10秒内超过20个请求进行中断拦截。
回退降级 在Hystrix执行非核心链路功能失败的情况下,如何处理,比如返回默认值等。 如果我们要回退或者降级处理,代码上需要实现HystrixCommand.getFallback()方法或者是HystrixObservableCommand. HystrixObservableCommand()。
降级回退方式
Fail Fast 快速失败
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @Override protected String run () { if (throwException) { throw new RuntimeException("failure from CommandThatFailsFast" ); } else { return "success" ; } } @Override protected Observable<String> resumeWithFallback () { if (throwException) { return Observable.error(new Throwable("failure from CommandThatFailsFast" )); } else { return Observable.just("success" ); } }
Fail Silent 无声失败 返回null,空Map,空List
1 2 3 4 5 6 7 8 9 10 11 12 @Override protected String getFallback () { return null ; } @Override protected List<String> getFallback () { return Collections.emptyList(); } @Override protected Observable<String> resumeWithFallback () { return Observable.empty(); }
Fallback: Static 返回默认值
1 2 3 4 5 6 7 8 @Override protected Boolean getFallback () { return true ; } @Override protected Observable<Boolean> resumeWithFallback () { return Observable.just( true ); }
Fallback: Stubbed 自己组装一个值返回
1 2 3 4 protected UserAccount getFallback () { return new UserAccount(customerId, "Unknown Name" , countryCodeFromGeoLookup, true , true , false ); }
Fallback: Cache via Network 利用远程缓存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 @Override protected String getFallback () { return new FallbackViaNetwork(id).execute(); } private static class FallbackViaNetwork extends HystrixCommand <String > { private final int id; public FallbackViaNetwork (int id) { super (Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("RemoteServiceX" )) .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueFallbackCommand" )) .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("RemoteServiceXFallback" ))); this .id = id; } @Override protected String run () { MemCacheClient.getValue(id); } @Override protected String getFallback () { return null ; } }
Hystrix为我们提供了一套线上系统容错的技术实践方法,我们通过在系统中引入Hystrix的jar包可以很方便的使用线程隔离、熔断、回退等技术。同时它还提供了监控页面配置,方便我们管理查看每个接口的调用情况。 像spring cloud这种微服务构建模式中也引入了Hystrix,我们可以放心使用Hystrix的线程隔离技术,来防止雪崩这种可怕的致命性线上故障。
注解使用
同步执行
1 2 3 4 5 6 public class UserService { @HystrixCommand public User getUserById (String id) { return userResource.getUserById(id); } }
异步执行
1 2 3 4 5 6 7 8 9 @HystrixCommand public Future<User> getUserByIdAsync (final String id) { return new AsyncResult<User>() { @Override public User invoke () { return userResource.getUserById(id); } }; }
Properties属性介绍 每个方法的相关Properties1 2 3 4 5 6 7 8 9 10 11 12 13 14 @HystrixCommand (groupKey = "hello" ,commandKey = "hello-service" ,threadPoolKey = "hello-pool" , threadPoolProperties = { @HystrixProperty (name = "coreSize" , value = "30" ), @HystrixProperty (name = "maxQueueSize" , value = "101" ), @HystrixProperty (name = "keepAliveTimeMinutes" , value = "2" ), @HystrixProperty (name = "queueSizeRejectionThreshold" , value = "15" ), @HystrixProperty (name = "metrics.rollingStats.numBuckets" , value = "12" ), @HystrixProperty (name = "metrics.rollingStats.timeInMilliseconds" , value = "1440" ) }, commandProperties = { @HystrixProperty (name = "execution.isolation.thread.timeoutInMilliseconds" ,value = "5000" ), @HystrixProperty (name = "execution.isolation.strategy" ,value = "THREAD" )}, fallbackMethod = "helloFallBack" )
@DefaultProperties
Class类上默认的Properties,不需要每个function都定义一次Properties1 2 3 4 5 6 7 8 9 10 11 @DefaultProperties (groupKey = "DefaultGroupKey" )class Service { @HystrixCommand public Object commandInheritsDefaultProperties () { return null ; } @HystrixCommand (groupKey = "SpecificGroupKey" ) public Object commandOverridesGroupKey () { return null ; } }
fallback回退
fallback方法必须和指定fallback方法的主方法在一个类中
fallback方法的参数必须要和主方法的参数一致 否则不生效
使用fallback方法需要根据依赖服务设置合理的超时时间,即1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 commandProperties = { @HystrixProperty ( name = "execution.isolation.thread.timeoutInMilliseconds" , value = "5000" )} @HystrixCommand (fallbackMethod = "fallback1" )User getUserById (String id) { throw new RuntimeException("getUserById command failed" ); } @HystrixCommand (fallbackMethod = "fallback2" )User fallback1 (String id, Throwable e) { assert "getUserById command failed" .equals(e.getMessage()); throw new RuntimeException("fallback1 failed" ); } @HystrixCommand (fallbackMethod = "fallback3" )User fallback2 (String id) { throw new RuntimeException("fallback2 failed" ); } @HystrixCommand (fallbackMethod = "staticFallback" )User fallback3 (String id, Throwable e) { assert "fallback2 failed" .equals(e.getMessage()); throw new RuntimeException("fallback3 failed" ); } User staticFallback (String id, Throwable e) { assert "fallback3 failed" .equals(e.getMessage()); return new User("def" , "def" ); } @Test public void test () { assertEquals("def" , getUserById("1" ).getName()); }
缓存功能
CacheResult @CacheResult方法可以用在我们之前的Service方法上,表示给该方法开启缓存,默认情况下方法的所有参数都将作为缓存的key如下:
1 2 3 4 5 @CacheResult @HystrixCommand public Book test6 (Integer id,String aa) { return restTemplate.getForObject("http://HELLO-SERVICE/getbook5/{1}" , Book.class, id); }
CacheKey 当然除了使用默认数据之外,我们也可以使用@CacheKey来指定缓存的key
1 2 3 4 5 @CacheResult @HystrixCommand public Book test6 (@CacheKey Integer id,String aa) { return restTemplate.getForObject("http://HELLO-SERVICE/getbook5/{1}" , Book.class, id); }
CacheRemove 这个当然是用来让缓存失效的注解,用法也很简单
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @CacheRemove (commandKey = "test6" )@HystrixCommand public Book test7 (@CacheKey Integer id) { return null ; } @RequestMapping ("/test6" )public Book test6 () { HystrixRequestContext.initializeContext(); Book b1 = bookService.test6(2 ); bookService.test7(2 ); Book b2 = bookService.test6(2 ); Book b3 = bookService.test6(2 ); return b1; }
HystrixCollapser 合并命令 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @Service public class MyService { Logger logger = LoggerFactory.getLogger(MyService.class); @HystrixCollapser (batchMethod = "getIds" , collapserProperties = {@HystrixProperty (name = "timerDelayInMilliseconds" , value = "200" )}, scope = GLOBAL) public Future<String> getId (String id) { System.out.println("getId" ); return null ; } @HystrixCommand public List<String> getIds (List<String> ids) { System.out.println("executing on thread id: " + Thread.currentThread().getName() + ", id:" +ids.toString()); return ids.stream().map(data->String.valueOf(Integer.valueOf(data)+1 )).collect(Collectors.toList()); } } @Controller public class CollegeController { @Autowired private CollegeService collegeService; @Autowired private MyService service; @RequestMapping ("/collapser" ) @ResponseBody public String collapser () throws InterruptedException, ExecutionException { Future<String> f1 = service.getId("1" ); Future<String> f2 = service.getId("2" ); String result1 = f1.get(); String result2 = f2.get(); return result1+result2; } }
流程
toObservable 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 public Observable<R> toObservable () { final AbstractCommand<R> _cmd = this ; final Action0 terminateCommandCleanup = new Action0() { @Override public void call () { if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) { handleCommandEnd(false ); } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) { handleCommandEnd(true ); } } }; final Action0 unsubscribeCommandCleanup = new Action0() { @Override public void call () { if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) { if (!_cmd.executionResult.containsTerminalEvent()) { _cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey); try { executionHook.onUnsubscribe(_cmd); } catch (Throwable hookEx) { logger.warn("Error calling HystrixCommandExecutionHook.onUnsubscribe" , hookEx); } _cmd.executionResultAtTimeOfCancellation = _cmd.executionResult .addEvent((int ) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED); } handleCommandEnd(false ); } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) { if (!_cmd.executionResult.containsTerminalEvent()) { _cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey); try { executionHook.onUnsubscribe(_cmd); } catch (Throwable hookEx) { logger.warn("Error calling HystrixCommandExecutionHook.onUnsubscribe" , hookEx); } _cmd.executionResultAtTimeOfCancellation = _cmd.executionResult .addEvent((int ) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED); } handleCommandEnd(true ); } } }; final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() { @Override public Observable<R> call () { if (commandState.get().equals(CommandState.UNSUBSCRIBED)) { return Observable.never(); } return applyHystrixSemantics(_cmd); } }; final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() { @Override public R call (R r) { R afterFirstApplication = r; try { afterFirstApplication = executionHook.onComplete(_cmd, r); } catch (Throwable hookEx) { logger.warn("Error calling HystrixCommandExecutionHook.onComplete" , hookEx); } try { return executionHook.onEmit(_cmd, afterFirstApplication); } catch (Throwable hookEx) { logger.warn("Error calling HystrixCommandExecutionHook.onEmit" , hookEx); return afterFirstApplication; } } }; final Action0 fireOnCompletedHook = new Action0() { @Override public void call () { try { executionHook.onSuccess(_cmd); } catch (Throwable hookEx) { logger.warn("Error calling HystrixCommandExecutionHook.onSuccess" , hookEx); } } }; return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call () { if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) { IllegalStateException ex = new IllegalStateException("This instance can only be executed once. Please instantiate a new instance." ); throw new HystrixRuntimeException(FailureType.BAD_REQUEST_EXCEPTION, _cmd.getClass(), getLogMessagePrefix() + " command executed multiple times - this is not permitted." , ex, null ); } commandStartTimestamp = System.currentTimeMillis(); if (properties.requestLogEnabled().get()) { if (currentRequestLog != null ) { currentRequestLog.addExecutedCommand(_cmd); } } final boolean requestCacheEnabled = isRequestCachingEnabled(); final String cacheKey = getCacheKey(); if (requestCacheEnabled) { HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey); if (fromCache != null ) { isResponseFromCache = true ; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } } Observable<R> hystrixObservable = Observable.defer(applyHystrixSemantics) .map(wrapWithAllOnNextHooks); Observable<R> afterCache; if (requestCacheEnabled && cacheKey != null ) { HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd); HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache); if (fromCache != null ) { toCache.unsubscribe(); isResponseFromCache = true ; return handleRequestCacheHitAndEmitValues(fromCache, _cmd); } else { afterCache = toCache.toObservable(); } } else { afterCache = hystrixObservable; } return afterCache .doOnTerminate(terminateCommandCleanup) .doOnUnsubscribe(unsubscribeCommandCleanup) .doOnCompleted(fireOnCompletedHook); } }); }
applyHystrixSemantics 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() { @Override public Observable<R> call () { if (commandState.get().equals(CommandState.UNSUBSCRIBED)) { return Observable.never(); } return applyHystrixSemantics(_cmd); } }; private Observable<R> applyHystrixSemantics (final AbstractCommand<R> _cmd) { executionHook.onStart(_cmd); if (circuitBreaker.allowRequest()) { final TryableSemaphore executionSemaphore = getExecutionSemaphore(); final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false ); final Action0 singleSemaphoreRelease = new Action0() { @Override public void call () { if (semaphoreHasBeenReleased.compareAndSet(false , true )) { executionSemaphore.release(); } } }; final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() { @Override public void call (Throwable t) { eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey); } }; if (executionSemaphore.tryAcquire()) { try { executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis()); return executeCommandAndObserve(_cmd) .doOnError(markExceptionThrown) .doOnTerminate(singleSemaphoreRelease) .doOnUnsubscribe(singleSemaphoreRelease); } catch (RuntimeException e) { return Observable.error(e); } } else { return handleSemaphoreRejectionViaFallback(); } } else { return handleShortCircuitViaFallback(); } }
executeCommandAndObserve 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 private Observable<R> executeCommandAndObserve (final AbstractCommand<R> _cmd) { final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread(); final Action1<R> markEmits = new Action1<R>() { @Override public void call (R r) { if (shouldOutputOnNextEvents()) { executionResult = executionResult.addEvent(HystrixEventType.EMIT); eventNotifier.markEvent(HystrixEventType.EMIT, commandKey); } if (commandIsScalar()) { long latency = System.currentTimeMillis() - executionResult.getStartTimestamp(); eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int ) latency, executionResult.getOrderedList()); eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey); executionResult = executionResult.addEvent((int ) latency, HystrixEventType.SUCCESS); circuitBreaker.markSuccess(); } } }; final Action0 markOnCompleted = new Action0() { @Override public void call () { if (!commandIsScalar()) { long latency = System.currentTimeMillis() - executionResult.getStartTimestamp(); eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int ) latency, executionResult.getOrderedList()); eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey); executionResult = executionResult.addEvent((int ) latency, HystrixEventType.SUCCESS); circuitBreaker.markSuccess(); } } }; final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() { @Override public Observable<R> call (Throwable t) { Exception e = getExceptionFromThrowable(t); executionResult = executionResult.setExecutionException(e); if (e instanceof RejectedExecutionException) { return handleThreadPoolRejectionViaFallback(e); } else if (t instanceof HystrixTimeoutException) { return handleTimeoutViaFallback(); } else if (t instanceof HystrixBadRequestException) { return handleBadRequestByEmittingError(e); } else { if (e instanceof HystrixBadRequestException) { eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey); return Observable.error(e); } return handleFailureViaFallback(e); } } }; final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() { @Override public void call (Notification<? super R> rNotification) { setRequestContextIfNeeded(currentRequestContext); } }; Observable<R> execution; if (properties.executionTimeoutEnabled().get()) { execution = executeCommandWithSpecifiedIsolation(_cmd) .lift(new HystrixObservableTimeoutOperator<R>(_cmd)); } else { execution = executeCommandWithSpecifiedIsolation(_cmd); } return execution.doOnNext(markEmits) .doOnCompleted(markOnCompleted) .onErrorResumeNext(handleFallback) .doOnEach(setRequestContext); }
executeCommandWithSpecifiedIsolation 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 private Observable<R> executeCommandWithSpecifiedIsolation (final AbstractCommand<R> _cmd) { if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) { return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call () { executionResult = executionResult.setExecutionOccurred(); if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name())); } metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD); if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) { return Observable.error(new RuntimeException("timed out before executing run()" )); } if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) { HystrixCounters.incrementGlobalConcurrentThreads(); threadPool.markThreadExecution(); endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey()); executionResult = executionResult.setExecutedInThread(); try { executionHook.onThreadStart(_cmd); executionHook.onRunStart(_cmd); executionHook.onExecutionStart(_cmd); return getUserExecutionObservable(_cmd); } catch (Throwable ex) { return Observable.error(ex); } } else { return Observable.error(new RuntimeException("unsubscribed before executing run()" )); } } }).doOnTerminate(new Action0() { @Override public void call () { if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) { handleThreadEnd(_cmd); } if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) { } } }).doOnUnsubscribe(new Action0() { @Override public void call () { if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) { handleThreadEnd(_cmd); } if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) { } } }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() { @Override public Boolean call () { return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT; } })); } else { return Observable.defer(new Func0<Observable<R>>() { @Override public Observable<R> call () { executionResult = executionResult.setExecutionOccurred(); if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) { return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name())); } metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE); endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey()); try { executionHook.onRunStart(_cmd); executionHook.onExecutionStart(_cmd); return getUserExecutionObservable(_cmd); } catch (Throwable ex) { return Observable.error(ex); } } }); } }
国内查看评论需要代理~