小心程序猿QAQ 潜水
  • 4发帖数
  • 4主题数
  • 0关注数
  • 0粉丝
开启左侧

再不看就删了!超详细的Ribbon源码解析

[复制链接]
小心程序猿QAQ 发表于 2021-10-19 13:10:29 | 显示全部楼层 |阅读模式 打印 上一主题 下一主题
Ribbon简介

什么是Ribbon?
Ribbon是springcloud下的客户端负载均衡器,消费者在通过服务别名调用服务时,须要通过Ribbon做负载均衡获取现实的服务调用地址,然后通过httpclient的方式举行当地RPC远程调用。
Ribbon原理
Ribbon负载均衡算法主要是轮询算法,分为以下几步:

  • 根据服务别名,从eureka获取服务提供者的列表
  • 将列表缓存到当地
  • 根据具体策略获取服务提供者
Ribbon的核心是负载均衡管理,另还有5个大功能点。如下图:

                               
登录/注册后可看大图

源码分析

事前准备

  • 先搭建一个SpringCloud的项目,也可以从我的github上下载。地址:github.com/mmcLine/spr…
  • 拷贝以下代码 @Configuration public class RestTemplateConfiguration { @Bean @LoadBalanced public RestTemplate getRestTemplate(){ return new RestTemplate(); } } @Autowired private RestTemplate restTemplate; @GetMapping("/testRibbon/{id}") public User getTodayStatistic(@PathVariable("id") Integer id){ String url ="http://STUDY-USER/user/getUserById?id="+id; return restTemplate.getForObject(url, User.class); } 复制代码
代码都准备好了,可以开始分析了。

  • 执行调用
http://localhost:8005/trade/testRibbon/2
为什么这么就能调用到服务提供者的方法?
打断点,可以看到restTemplate里有两个拦截器,根据名字可以推断RetryLoadBalancerInterceptor是关键。

                               
登录/注册后可看大图

跟踪到RetryLoadBalancerInterceptor类
@Override        public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,                                                                                final ClientHttpRequestExecution execution) throws IOException {                final URI originalUri = request.getURI();                //获取到service的name                final String serviceName = originalUri.getHost();                Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);                //根据serviceName和LoadBalancerClient,LoadBalancedRetryPolicy里面包罗了RibbonLoadBalancerContext和ServiceInstanceChooser                final LoadBalancedRetryPolicy retryPolicy = lbRetryFactory.createRetryPolicy(serviceName,                                loadBalancer);                RetryTemplate template = createRetryTemplate(serviceName, request, retryPolicy);                //执行方法会进入到doExecute方法                return template.execute(context -> {                        ServiceInstance serviceInstance = null;                        if (context instanceof LoadBalancedRetryContext) {                                LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext) context;                                serviceInstance = lbContext.getServiceInstance();                        }                        if (serviceInstance == null) {                                serviceInstance = loadBalancer.choose(serviceName);                        }                        ClientHttpResponse response = RetryLoadBalancerInterceptor.this.loadBalancer.execute(                                        serviceName, serviceInstance,                                        requestFactory.createRequest(request, body, execution));                        int statusCode = response.getRawStatusCode();                        if (retryPolicy != null && retryPolicy.retryableStatusCode(statusCode)) {                                byte[] bodyCopy = StreamUtils.copyToByteArray(response.getBody());                                response.close();                                throw new ClientHttpResponseStatusCodeException(serviceName, response, bodyCopy);                        }                        return response;                }, new LoadBalancedRecoveryCallback() {                        //This is a special case, where both parameters to LoadBalancedRecoveryCallback are                        //the same.  In most cases they would be different.                        @Override                        protected ClientHttpResponse createResponse(ClientHttpResponse response, URI uri) {                                return response;                        }                });        }复制代码doExecute方法:
protected  T doExecute(RetryCallback retryCallback,                        RecoveryCallback recoveryCallback, RetryState state)                        throws E, ExhaustedRetryException {        //省略部门代码                        /*                         * We allow the whole loop to be skipped if the policy or context already                         * forbid the first try. This is used in the case of external retry to allow a                         * recovery in handleRetryExhausted without the callback processing (which                         * would throw an exception).                         */                         //执行逻辑的关键方法                        while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {                                }复制代码继续跟踪canRetry方法
  @Override    public boolean canRetry(RetryContext context) {        LoadBalancedRetryContext lbContext = (LoadBalancedRetryContext)context;        if(lbContext.getRetryCount() == 0  && lbContext.getServiceInstance() == null) {            //We haven't even tried to make the request yet so return true so we do            //设置选中的服务提供者            lbContext.setServiceInstance(serviceInstanceChooser.choose(serviceName));            return true;        }        return policy.canRetryNextServer(lbContext);    }复制代码我们跟踪serviceInstanceChooser.choose(serviceName)看看怎么通过serviceName选服务提供者的。
@Override        public ServiceInstance choose(String serviceId) {            //选择server                Server server = getServer(serviceId);                if (server == null) {                        return null;                }                return new RibbonServer(serviceId, server, isSecure(server, serviceId),                                serverIntrospector(serviceId).getMetadata(server));        }复制代码跟踪getServer方法
protected Server getServer(ILoadBalancer loadBalancer) {                if (loadBalancer == null) {                        return null;                }                //可以看出是loadBalancer在选择                return loadBalancer.chooseServer("default"); // TODO: better handling of key        }复制代码继续深入
public Server chooseServer(Object key) {        if (counter == null) {            counter = createCounter();        }        //有一个调用次数在+1        counter.increment();        if (rule == null) {            return null;        } else {            try {                //委托给了IRule,以是Irule是负载均衡的关键,最厥后总结                return rule.choose(key);            } catch (Exception e) {                logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);                return null;            }        }    }复制代码查看Irule的实现
public Server choose(Object key) {        ILoadBalancer lb = getLoadBalancer();        //lb.getAllServers里面是全部的服务提供者列表        Optional server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);        if (server.isPresent()) {            return server.get();        } else {            return null;        }           }复制代码跟踪chooseRoundRobinAfterFiltering方法
public Optional chooseRoundRobinAfterFiltering(List servers, Object loadBalancerKey) {        //拿到筛选后的servers        List eligible = getEligibleServers(servers, loadBalancerKey);        if (eligible.size() == 0) {            return Optional.absent();        }        //incrementAndGetModulo方法拿到下标,然后根据list.get取到一个服务        return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));    }复制代码至此就拿到了具体的服务提供者。
但是到这里还有个问题?

  • 怎么根据服务名拿到server的?
有一个ServerList接口是用于拿到服务列表的。我们使用的loadBalancer(ZoneAwareLoadBalancer)的父类DynamicServerListLoadBalancer类的构造方法里,有一个restOfinit方法
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,                                         ServerList serverList, ServerListFilter filter,                                         ServerListUpdater serverListUpdater) {        super(clientConfig, rule, ping);        this.serverListImpl = serverList;        this.filter = filter;        this.serverListUpdater = serverListUpdater;        if (filter instanceof AbstractServerListFilter) {            ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());        }        restOfInit(clientConfig);    }复制代码跟踪restOfInit方法
void restOfInit(IClientConfig clientConfig) {        boolean primeConnection = this.isEnablePrimingConnections();        // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()        this.setEnablePrimingConnections(false);        enableAndInitLearnNewServersFeature();                //用于获取全部的serverList        updateListOfServers();        if (primeConnection && this.getPrimeConnections() != null) {            this.getPrimeConnections()                    .primeConnections(getReachableServers());        }        this.setEnablePrimingConnections(primeConnection);        LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());    }复制代码继续跟踪updateListOfServers方法
public void updateListOfServers() {        List servers = new ArrayList();        if (serverListImpl != null) {            //查询serverList            servers = serverListImpl.getUpdatedListOfServers();            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",                    getIdentifier(), servers);            if (filter != null) {                servers = filter.getFilteredListOfServers(servers);                LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",                        getIdentifier(), servers);            }        }        updateAllServerList(servers);    }复制代码继续跟踪源码到obtainServersViaDiscovery方法,
private List obtainServersViaDiscovery() {        List serverList = new ArrayList();    //eurekaClientProvider.get()会去获取EurekaClient        if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {            logger.warn("EurekaClient has not been initialized yet, returning an empty list");            return new ArrayList();        }        EurekaClient eurekaClient = eurekaClientProvider.get();        //vipAddresses就是serviceName        if (vipAddresses!=null){            for (String vipAddress : vipAddresses.split(",")) {                // if targetRegion is null, it will be interpreted as the same region of client                //此处获取到服务的信息                List listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);                for (InstanceInfo ii : listOfInstanceInfo) {                    if (ii.getStatus().equals(InstanceStatus.UP)) {                        if(shouldUseOverridePort){                            if(logger.isDebugEnabled()){                                logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);                            }                            // copy is necessary since the InstanceInfo builder just uses the original reference,                            // and we don't want to corrupt the global eureka copy of the object which may be                            // used by other clients in our system                            InstanceInfo copy = new InstanceInfo(ii);                            if(isSecure){                                ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();                            }else{                                ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();                            }                        }                        DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);                        des.setZone(DiscoveryClient.getZone(ii));                        serverList.add(des);                    }                }                if (serverList.size()>0 && prioritizeVipAddressBasedServers){                    break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers                }            }        }        return serverList;    }复制代码综合上面可以看出,最终是通过eurekaClient去拿到服务列表的。
那么如果服务列表发生变革怎么刷新呢?
是通过CacheRefreshThread在定时线程池里面执行,核心拉取方法是fetchRegistry
Iping
Iping是用于探测服务列表中的服务是否正常,如果不正常,则从eureka拉取服务列表并更新。
在BaseLoadBalancer里面有一个setupPingTask方法,启动定时任务,30秒一次定时向EurekaClient发送“ping”
public BaseLoadBalancer(String name, IRule rule, LoadBalancerStats stats,            IPing ping, IPingStrategy pingStrategy) {                logger.debug("LoadBalancer [{}]:  initialized", name);                this.name = name;        this.ping = ping;        this.pingStrategy = pingStrategy;        setRule(rule);        setupPingTask();        lbStats = stats;        init();    }复制代码Iping的具体逻辑在PingTask类里。
Irule总结:
Irule是负载均衡的规则:
我这里默认是使用的是ZoneAvoidanceRule,还有许多种策略:

  • RandomRule: 随机
  • RoundRobinRule: 轮询
  • RetryRule: 先按照RoundRobinRule的策略获取服务,如果获取服务失败则在指定时间内会举行重试,获取可用的服务
  • WeightedResponseTimeRule: 对RoundRobinRule的扩展,响应速度越快的实例选择权重越大,越容易被选择
  • BestAvailableRule:会先过滤掉由于多次访问故障而处于断路器跳闸状态的服务,然后选择一个并发量最小的服务
  • AvailabilityFilteringRule:先过滤掉故障实例,再选择并发较小的实例
  • ZoneAvoidanceRule:默认规则,复合判断server所在地区的性能和server的可用性选择服务器
properties配置方式如下:
STUDY-USER是服务名
STUDY-USER.ribbon.NFLoadBalancerRuleClassName=com.netflix.loadbalancer.RoundRobinRule

作者:别掉头发了
链接:https://juejin.cn/post/7020403359270043685
来源:稀土掘金
著作权归作者全部。商业转载请联系作者获得授权,非商业转载请注明出处。


精彩评论2

吴凡86 发表于 2021-10-20 12:16:15 | 显示全部楼层
转发了
幻415 发表于 2021-10-19 20:06:58 | 显示全部楼层
转发了
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

猜你喜欢
在线客服邮箱
wxcy#wkgb.net

邮箱地址#换为@

Powered by 创意电子 ©2018-现在 专注资源实战分享源码下载站联盟商城