Java码农之路 潜水
  • 13发帖数
  • 13主题数
  • 0关注数
  • 0粉丝
开启左侧

面试突击系列:Dubbo 的核心源码和原理剖析

[复制链接]
Java码农之路 发表于 2021-4-19 16:32:39 | 显示全部楼层 |阅读模式 打印 上一主题 下一主题
本 Chat 中,我们将基于 Dubbo 的 2.7.8 版本来讲解,主要是从口试官提问的角度,然后通过几个例子层层递进分析源码,让大家能够一步步在口试的氛围中深入相识 Dubbo 核心原理。话不多说了,我们进入第一个口试提问。
Dubbo 整体框架原理

口试官:Dubbo 是用来做什么的?内部的大概原理能讲一下吗?
首先,Dubbo 就是一个 RPC 框架,用来使开辟者实现远程调用如同本地调用一样方便的框架,只需要简朴配置和引入提供者的接口,就可以直接通过接口调用到远程服务提供的实现。 内部的大概原理就是提供者提供接口,并且将接口暴露成为一个可供访问的服务,然后把提供者服务的 IP 和端口注册到注册中心,消耗者在调用本地接口的时候,Dubbo 框架就会先根据接口类路径从注册中心拉取对应的 IP 和端口,然后连接到这个提供者实验业务逻辑,获取到效果后返回到消耗者本地接口的方法中。 听起来有点绕,直接看下图:

                               
登录/注册后可看大图

现在先有个大概的印象,下面会更具体介绍整个过程,用一步一图的方式分析。
Dubbo 的提供者核心源码和原理

口试官:那服务提供者是怎样将自己的服务暴露出去的,然后消耗者为什么能调用?
在回答这个问题之前,我们先从一个源码中的小例子入手,然后层层深入跟进源码:
private static void startWithExport() throws InterruptedException {    // 服务提供者的配置    ServiceConfig service = new ServiceConfig();    service.setInterface(DemoService.class);    service.setRef(new DemoServiceImpl());    service.setApplication(new ApplicationConfig("dubbo-demo-api-provider"));    service.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));    // 最重要的入口,将服务暴露出去    service.export();    System.out.println("dubbo service started");    // 卡住主进程    new CountDownLatch(1).await();}这里的关键就是 service.export() 这行代码,跟进去后会发现,Dubbo 会将各种需要通报的数据比如方法入参、上下文参数等都写入到一个 URL 中,并且根据不同协议实验服务导出,这里我们全部按照默认 Zookeeper 注册中心的实现来跟进,因此可以看到下面这一段关键代码:
// 生成动态代理对象Invoker invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);// 真正导出服务Exporter exporter = PROTOCOL.export(wrapperInvoker);这里的 PROTOCOL 会经过 ProtocolListenerWrapper、ProtocolFilterWrapper,然后才会到 RegistryProtocol,而在 RegistryProtocol 中,我们发现在这里是通过 doLocalExport() 再一次履历 ProtocolListenerWrapper、ProtocolFilterWrapper 然后到 DubboProtocol,而在这个过程中,ProtocolFilterWrapper 会实验所有已注册的 Filter。 然后 RegistryProtocol 还会获取注册中心的地址,并将本提供者注册到了注册中心,而注册中心的注册实在就是调用了 ZookeeperRegistry 的 doRegister() 直接将提供者的 URL 作为临时节点写入到了 Zookeeper 的 providers 目次下面。 顺便说一句,想相识更多的关于 Zookeeper 的相干源码和原理知识,欢迎订阅我的另一篇 Chat:口试突击系列:Zookeeper 的核心源码和原理分析。
try {    // 将服务注册到zk    zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));} catch (Throwable e) {    throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);}注册完成之后,RegistryProtocol 的使命就临时结束了。到这里我们画个图表示一下:

                               
登录/注册后可看大图

接下来我们重点看一下 DubboProtocol 做了什么。 首先 DubboProtocol 通过调用 Exchangers#bind() 创建了一个 ExchangeServer,而在 bind() 中经过 HeaderExchanger、Transporters、Transporter$Adaptive 的调用,最终来到了 NettyTransporter,在这里创建了一个 NettyServer。
@Overridepublic RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {    // 初始化netty服务端    return new NettyServer(url, handler);}看到这里,我们就非常清楚地相识到了,Dubbo 的服务提供者实在就是使用 Netty 框架创建了一个 Server,由此我们可以猜测到,Dubbo 的服务消耗者应该是作为 Netty 的 Client 连接到 Server 这里进行通讯的。 所以 Dubbo 提供者是怎样暴露服务的呢,实在就是干了两件事:一个是将提供者的信息注册到注册中心,一个是启动 NettyServer 作为服务端提供服务。至于 NettyServer 里面具体是怎样初始化以及 Netty 参数优化,我们下面再讲。
Dubbo 的消耗者核心源码和原理

口试官:服务消耗者是怎样仅仅通过一个接口类直接调用到提供者的,并且做到失败重试、负载均衡的?
同样的,我们从一个消耗者的小例子入手,层层分析源码:
private static void runWithRefer() {    ReferenceConfig reference = new ReferenceConfig();    reference.setApplication(new ApplicationConfig("dubbo-demo-api-consumer"));    // 注册中心配置    reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));    // 设置接口类    reference.setInterface(DemoService.class);    // 关键代码:消耗者入口    DemoService service = reference.get();    String message = service.sayHello("dubbo");    System.out.println(message);}这里我们看到消耗者设置了注册中心配置和接口类,然后通过调用 reference.get() 就实现了服务的引入,接着就是直接调用接口的方法。因此我们从 reference.get() 入手分析。 从 ReferenceConfig 中可以看到,在创建代理类的时候,实在是使用了 REF_PROTOCOL.refer() 实现的:
// 只有一个注册中心的时候if (urls.size() == 1) {    // 重点代码,创建服务引用    invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));}而这个 REF_PROTOCOL 也是跟提供者一样,经过了 ProtocolListenerWrapper、ProtocolFilterWrapper 的包装,然后来到了 RegistryProtocol。在 RegistryProtocol 中,首先是获取了注册中心的地址和集群容错策略,然后将本消耗者作为临时节点注册到了注册中心的 consumers,接着从注册中心将接口类的提供者信息拉取到本地缓存起来,并且监听该接口类的提供者列表的变更事件,最后将接口类包装成一个包罗集群容错策略的 Invoker 并返回,Invoker 的默认实现就是 MockClusterInvoker。而 MockClusterInvoker 里面的 invoker 对象实在就是 FailoverClusterInvoker,然后同时这个 invoker 还持有了 interceptors 拦截器链条。估计到时候就是经过层层拦截器链条之后,再调用 FailoverClusterInvoker 实现失败重试功能。关键代码如下:
// 3、包装机器容错策略到invokerInvoker invoker = cluster.join(directory);public class MockClusterWrapper implements Cluster {    @Override    public  Invoker join(Directory directory) throws RpcException {        return new MockClusterInvoker(directory,                // 这里的cluster默认是FailoverCluster                this.cluster.join(directory));    }}这里需要关注一下 RegistryDirectory#subscribe(),这个方法是订阅注册中心的提供者列表,拉取到了提供者的 URL 之后,通过 toInvokers() 将 URL 转换为 Invoker 对象,也就是下面这一段关键代码:
if (enabled) {    // 关键代码:这里调用Dubbo协议转换服务到invoker    // 这里的protocol.refer(serviceType, url)的效果是ProtocolFilterWrapper    invoker = new InvokerDelegate(protocol.refer(serviceType, url), url, providerUrl);}这里的 protocol.refer(serviceType, url) 就会经过 ProtocolFilterWrapper 的包装,ProtocolFilterWrapper 会实验注册的消耗者 Filter,然后才到了 DubboProtocol。而 DubboProtocol 中的关键就是创建了 ExchangeClient,代码如下:
// 创建即时连接else {    client = Exchangers.connect(url, requestHandler);}而这个 Exchangers.connect() 我们就很熟悉了,跟上面的提供者差不多,在connect() 中经过 HeaderExchanger、Transporters、Transporter$Adaptive 的调用,最终来到了 NettyTransporter,在这里创建了一个 NettyClient,也就是说,Dubbo 的消耗者实在就是作为一个 Netty 的客户端,连接到提供者的 Netty 服务端,然后进行数据传输。 而前面返回的 invoker 对象也在 ReferenceConfig 中创建为 InvokerInvocationHandler 对象,所以 InvokerInvocationHandler 就是具体的代理类了,当调用我们代码定义的接口类的时候,实际就是调用 InvokerInvocationHandler 的 invoke() 方法,代码如下:
// 6、创建服务代理,这里默认调用的是 JavassistProxyFactory 的 getProxy()方法// invoker就是MockClusterInvokerreturn (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));public class JavassistProxyFactory extends AbstractProxyFactory {    @Override    @SuppressWarnings("unchecked")    public  T getProxy(Invoker invoker, Class[] interfaces) {        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));    }}所以这里我们梳理一下这个过程:

                               
登录/注册后可看大图

所以下面我们从 InvokerInvocationHandler 的 invoke() 方法入手,开始分析一次完整的 Dubbo 调用是怎样串起来的。 InvokerInvocationHandler 组装一下 RPC 调用参数然后直接调用 MockClusterInvoker 的 invoke(),并进行异常的捕获处理。然后 MockClusterInvoker 根据参数是否有 Mock 模式,如果有则返回 Mock 效果,如果没有则继承往下调用 FailoverClusterInvoker。 FailoverClusterInvoker 就是根据重试次数循环,捕获到异常后根据负载均衡策略重新选择 invoker 发起调用。关键代码如下:
// 3、使用循环,失败重试RpcException le = null; // last exception.List invoked = new ArrayList(copyInvokers.size()); // invoked invokers.Set providers = new HashSet(len);for (int i = 0; i < len; i++) {    // 重试时,进行重新选择,克制重试时invoker列表已经发生变革    // 注意:如果列表发生了变革,那么invoked判断会失效,因为invoker实例已经改变    if (i > 0) {        // 3.1、如果当前实例已经被销毁,则抛出异常        checkWhetherDestroyed();        // 3.2、重新获取所有服务提供者        copyInvokers = list(invocation);        // check again        // 3.3、重新检查一下        checkInvokers(copyInvokers, invocation);    }    // 3.4、根据负载均衡策略选择invoker    Invoker invoker = select(loadbalance, invocation, copyInvokers, invoked);    invoked.add(invoker);    RpcContext.getContext().setInvokers((List) invoked);    try {        // 3.5、具体发起远程调用        // 这里拿到的invoker对象是RegistryDirectory$InvokerDelegate        Result result = invoker.invoke(invocation);        ...        return result;    } catch (RpcException e) {        if (e.isBiz()) { // biz exception.            throw e;        }        le = e;    } catch (Throwable e) {        le = new RpcException(e.getMessage(), e);    } finally {        providers.add(invoker.getUrl().getAddress());    }}而负载均衡策略的则是在 select() 方法中实现,默认是使用 RandomLoadBalance 也就是随机选择策略,并且在重新选择的时候会先清除已经使用过的 invoker。 相信看到这里,大家就能回答前面的口试题了。 接下来 FailoverClusterInvoker 的 invoke() 会开始经过 Filter 的过滤链条处理,然后最终进入了 DubboInvoker 的 invoke() 方法,在这里会先从线程池获取一个线程,然后将前面初始化好的 Netty 客户端发起远程连接并且传输数据到服务提供者。
Netty 的参数优化

口试官:前面说到 Dubbo 是使用 Netty 作为通讯框架,那么使用 Netty 有什么好处?
这个口试题实在就是在问 Netty 的使用,并且最好能将一些 Netty 的参数调优表现出来。 前面我们提到的,服务提供者的核心实在是初始化了 Netty 服务端并且将 IP 和端口上报给注册中心,然后服务消耗者就从注册中心获取接口类对应的 IP 和端口,便可以通过 Netty 客户端连接到服务端,将数据传输到服务端进行处理,处理完成后吸收返回的数据,转换为接口方法的返回类型,返回给该方法。 因此我们先看一下 Netty 服务端的初始化:
// 设置Netty的boss线程池和worker线程池bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");workerGroup = NettyEventLoopFactory.eventLoopGroup(        getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),        "NettyServerWorker");首先这里是 reactor 线程模型的使用, 这里设置了2个线程池,bossGroup主要用于处理客户端的accept连接、断开连接等,workerGroup处理数据的传输,包括read、write事件和pipeline链条中的handler。 简朴提一下,reactor 线程模型就是将线程池的职责分开,处理连接的属于低频操纵且阻塞时间相对较长(因为需要进行 TCP 3次握手),处理数据的 IO 传输属于高频且和业务相干性精密,使用互不干扰的线程池可以提高效率。 接下来看一下参数的设置:
// 设置netty的业务处理类final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);channels = nettyServerHandler.getChannels();bootstrap.group(bossGroup, workerGroup)    .channel(NettyEventLoopFactory.serverSocketChannelClass())    //一样平常来说,一个端口释放后会等待两分钟之后才能再被使用,SO_REUSEADDR是让端口释放后立即就可以被再次使用,用于断开后的重连。    .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)    // 不启用开启Nagle算法,Nagle算法会收集网络小包再一次性发送,不启用则是即时发送,提高时效性    .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)    // ByteBuf的分配器(重用缓冲区),也就是使用对象池重复利用内存块    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)实在除了上面那几个参数外,还有一些比较重要的参数在这块源码中没有表现,因此我从Seata(分布式事务框架)将相干的源码摘出来:
this.serverBootstrap      .group(this.eventLoopGroupBoss, this.eventLoopGroupWorker)      .channel(NettyServerConfig.SERVER_CHANNEL_CLAZZ)      // TCP 3次握手的队列缓冲区巨细  .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getSoBackLogSize())      //一样平常来说,一个端口释放后会等待两分钟之后才能再被使用,SO_REUSEADDR是让端口释放后立即就可以被再次使用。  .option(ChannelOption.SO_REUSEADDR, true)      // 连接保活  .childOption(ChannelOption.SO_KEEPALIVE, true)      // 不启用开启Nagle算法,Nagle算法会收集网络小包再一次性发送,不启用则是即时发送  .childOption(ChannelOption.TCP_NODELAY, true)      // 发送数据缓冲区巨细  .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSendBufSize())      // 吸收数据缓冲区巨细  .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketResvBufSize())      // 控制网络水位,将网络传输速度维持在比较安稳的状态  .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK,      new WriteBufferWaterMark(nettyServerConfig.getWriteBufferLowWaterMark(),          nettyServerConfig.getWriteBufferHighWaterMark()))  .localAddress(new InetSocketAddress(listenPort))而这个参数 WRITEBUFFERWATER_MARK 是属于新版本才有的,原理大概就是设置一个高水位、一个低水位,当输出的数据速率高于高水位时,则暂停一下 write 事件,改为处理 read 事件,这样就可以控制网络传输的速度比较安稳,克制大流量打死网卡。 最后看一下处理的链条:
.childHandler(new ChannelInitializer() {    @Override    protected void initChannel(SocketChannel ch) throws Exception {        int idleTimeout = UrlUtils.getIdleTimeout(getUrl());        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);        // 默认不启用SSL        if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {            ch.pipeline().addLast("negotiation",                    SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));        }        ch.pipeline()                // 解码器                .addLast("decoder", adapter.getDecoder())                // 编码器                .addLast("encoder", adapter.getEncoder())                // 心跳检查handler                .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))                // 业务处理handler                .addLast("handler", nettyServerHandler);    }});// bind// 绑定本地端口,并启动监听服务ChannelFuture channelFuture = bootstrap.bind(getBindAddress());channelFuture.syncUninterruptibly();channel = channelFuture.channel();这里我们可以看到 pipeline() 里面加入了很多 handler,实在这里就是使用了责任链设计模式,吸收到的网络请求,都会依次使用每个 handler 处理一下。 编解码器涉及到了怎样处理网络的粘包、拆包的处理。 IdleStateHandler 心跳处理器,这个 handler 实在是 Netty 提供的,目标是维持 server 端和 client 端的长连接,如果没有设置这个 handler,就会经常出现 TCP 的超时时间到了,然后客户端直接断开和服务端的连接,这个笔者之前遇到过这个 bug,现象就是创建连接一段时间之后,没有新的网络传输的时候,莫名其妙地断开了连接。
下面我们再看一下 Netty 客户端的初始化:
protected void doOpen() throws Throwable {    // 1、创建业务handler    final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);    // 2、创建启动器并配置    bootstrap = new Bootstrap();    bootstrap.group(NIO_EVENT_LOOP_GROUP)            // 连接保活            .option(ChannelOption.SO_KEEPALIVE, true)            // 不启用开启Nagle算法,Nagle算法会收集网络小包再一次性发送,不启用则是即时发送,提高时效性            .option(ChannelOption.TCP_NODELAY, true)            // ByteBuf的分配器(重用缓冲区),也就是使用对象池重复利用内存块            .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)            //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())            .channel(socketChannelClass());    // 设置连接超时    bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.max(3000, getConnectTimeout()));    // 3、添加handler到连接的pipeline    bootstrap.handler(new ChannelInitializer() {        @Override        protected void initChannel(SocketChannel ch) throws Exception {            int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());            if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {                ch.pipeline().addLast("negotiation", SslHandlerInitializer.sslClientHandler(getUrl(), nettyClientHandler));            }            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);            ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug                    // 解码器                    .addLast("decoder", adapter.getDecoder())                    // 编码器                    .addLast("encoder", adapter.getEncoder())                    // 心跳检查handler                    .addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS))                    // 业务处理handler                    .addLast("handler", nettyClientHandler);            String socksProxyHost = ConfigUtils.getProperty(SOCKS_PROXY_HOST);            if(socksProxyHost != null) {                int socksProxyPort = Integer.parseInt(ConfigUtils.getProperty(SOCKS_PROXY_PORT, DEFAULT_SOCKS_PROXY_PORT));                Socks5ProxyHandler socks5ProxyHandler = new Socks5ProxyHandler(new InetSocketAddress(socksProxyHost, socksProxyPort));                ch.pipeline().addFirst(socks5ProxyHandler);            }        }    });}客户端的参数与服务端的大同小异,需要注意的是客户端只需要一个线程池而已。
编解码器怎样处理粘包和拆包

口试官:怎样办理 TCP 网络传输中的拆包和粘包?
拆包是指在网络传输过程中,一份数据被拆分为多次传输,每次只传输了一部分。粘包是指在网络传输中,两份数据合并在一起传输过去了。Dubbo 的网络拆包和粘包的处理是通过在 Netty 的处理链条中添加的编解码器实现的。 Dubbo 的编码器是 DubboCodec 的父类 ExchangeCodec 实现的。我们看一下代码:
public class ExchangeCodec extends TelnetCodec {    @Override    public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {        // 1、对请求信息进行编码        if (msg instanceof Request) {            encodeRequest(channel, buffer, (Request) msg);        }        // 2、对响应信息进行编码        else if (msg instanceof Response) {            encodeResponse(channel, buffer, (Response) msg);        }        // 3、其他信息进行编码        else {            super.encode(channel, buffer, msg);        }    }    protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {        // 获取序列化扩展实现        Serialization serialization = getSerialization(channel);        // 创建Dubbo协议扩展头字节数组,HEADER_LENGTH=16        // header.        byte[] header = new byte[HEADER_LENGTH];        // 把魔数0xdabb写入协议头        // set magic number.        Bytes.short2bytes(MAGIC, header);        // 设置请求类型与序列化类型,标记到协议头        // set request and serialization flag.        header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());        if (req.isTwoWay()) {            header[2] |= FLAG_TWOWAY;        }        if (req.isEvent()) {            header[2] |= FLAG_EVENT;        }        // 将请求id设置到协议头        // set request id.        Bytes.long2bytes(req.getId(), header, 4);        // 使用serialization将对象数据部分进行编码,并把协议数据部分写入缓存        // encode request data.        int savedWriteIndex = buffer.writerIndex();        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);        ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);        ObjectOutput out = serialization.serialize(channel.getUrl(), bos);        if (req.isEvent()) {            encodeEventData(channel, out, req.getData());        } else {            encodeRequestData(channel, out, req.getData(), req.getVersion());        }        // 刷新缓存        out.flushBuffer();        if (out instanceof Cleanable) {            ((Cleanable) out).cleanup();        }        bos.flush();        bos.close();        // 检查payload(协议数据部分)是否合法        int len = bos.writtenBytes();        checkPayload(channel, len);        Bytes.int2bytes(len, header, 12);        // 将协议头写入缓存        // write        buffer.writerIndex(savedWriteIndex);        buffer.writeBytes(header); // write header.        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);    }    protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException {        int savedWriteIndex = buffer.writerIndex();        try {            //  获取序列化扩展实现            Serialization serialization = getSerialization(channel);            // 创建Dubbo协议扩展头字节数组, HEADER_LENGTH为 16            // header.            byte[] header = new byte[HEADER_LENGTH];            // 把魔数写入协议头            // set magic number.            Bytes.short2bytes(MAGIC, header);            // 设立请求类型与序列化类型,标记到协议头            // set request and serialization flag.            header[2] = serialization.getContentTypeId();            if (res.isHeartbeat()) {                header[2] |= FLAG_EVENT;            }            // 设置响应类型到第4字节位置            // set response status.            byte status = res.getStatus();            header[3] = status;            // 将请求ID设直到协议头            // set request id.            Bytes.long2bytes(res.getId(), header, 4);            buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);            ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);            // 使用serialization对数据部分进行编码,并把协议数据部分写入缓存            ObjectOutput out = serialization.serialize(channel.getUrl(), bos);            // encode response data or error message.            if (status == Response.OK) {                if (res.isHeartbeat()) {                    encodeEventData(channel, out, res.getResult());                } else {                    encodeResponseData(channel, out, res.getResult(), res.getVersion());                }            } else {                out.writeUTF(res.getErrorMessage());            }            // 刷新缓存            out.flushBuffer();            if (out instanceof Cleanable) {                ((Cleanable) out).cleanup();            }            bos.flush();            bos.close();            int len = bos.writtenBytes();            //  检查payload (协议数据部分)是否合法            checkPayload(channel, len);            Bytes.int2bytes(len, header, 12);            // 将协议头写入缓存            // write            buffer.writerIndex(savedWriteIndex);            buffer.writeBytes(header); // write header.            buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);        } catch (Throwable t) {            // clear buffer            buffer.writerIndex(savedWriteIndex);            ......        }    }}上面的代码实在可以看到 Dubbo 协议的组成,分为 2 大块:header 和 data 部分。header 总包罗了 16 字节,前 2 字节为魔数,标记一个数据帧的开始,然后是1字节的请求类型和序列化标记 id,然后 1 字节是只在响应报文设置的效果码,然后 8 字节是请求 id,最后 4 字节是 body 内容的巨细。 接下来看下解码的部分,同时可以从里面学习到对于网络拆包粘包的处理。InternalDecoder 就是解码的内部实现类。
private class InternalDecoder extends ByteToMessageDecoder {    @Override    protected void decode(ChannelHandlerContext ctx, ByteBuf input, List out) throws Exception {        ChannelBuffer message = new NettyBackedChannelBuffer(input);        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);        // decode object.        do {            // 先保存未读取之前的位置            int saveReaderIndex = message.readerIndex();            // 调用DubboCodec对数据进行解码            Object msg = codec.decode(channel, message);            // 如果遇到拆包,则重置message为之前的位置            if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {                message.readerIndex(saveReaderIndex);                break;            } else {                //is it possible to go here ?                if (saveReaderIndex == message.readerIndex()) {                    throw new IOException("Decode without read data.");                }                // 否则,走到这里就是读取乐成的                if (msg != null) {                    // 把解码乐成的对象放入out列表                    out.add(msg);                }            }        } while (message.readable());    }}这里我们可以看到 Dubbo 对于拆包的处理,实在就是判断一下是否遇到了 Codec2.DecodeResult.NEEDMOREINPUT,如果遇到了则放弃前面读取的部分,然后等待下次 read 通道里面的数据。至于粘包的处理,因为这里读取 header 的时候,都是按照固定从长度读取,并且读取 data 的时候,也是按照 header 里面指定的长度读取的,所以读到的效果肯定是完整的,不会出现多余的字节,如果不完整就是走拆包的处理逻辑。 接下来 codec.decode() 就是开始分析协议的内容了:
@Overridepublic Object decode(Channel channel, ChannelBuffer buffer) throws IOException {    int readable = buffer.readableBytes();    byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];    // 先读取协议头    buffer.readBytes(header);    //分析Dubbo协议数据部分    return decode(channel, buffer, readable, header);}@Overrideprotected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {    // 检查魔数,确认为Dubbo协议帧    // check magic number.    if (readable > 0 && header[0] != MAGIC_HIGH            || readable > 1 && header[1] != MAGIC_LOW) {        int length = header.length;        if (header.length < readable) {            header = Bytes.copyOf(header, readable);            buffer.readBytes(header, length, readable - length);        }        for (int i = 1; i < header.length - 1; i++) {            if (header == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {                buffer.readerIndex(buffer.readerIndex() - header.length + i);                header = Bytes.copyOf(header, i);                break;            }        }        return super.decode(channel, buffer, readable, header);    }    // 检查是否读取了一个完整的Dubbo协议头    // check length.    if (readable < HEADER_LENGTH) {        return DecodeResult.NEED_MORE_INPUT;    }    // 从协议头的最后四个字节读取协议数据部分的巨细    // get data length.    int len = Bytes.bytes2int(header, 12);    checkPayload(channel, len);    // 如采遇到半包问题,则直接返回    int tt = len + HEADER_LENGTH;    if (readable < tt) {        return DecodeResult.NEED_MORE_INPUT;    }    // 分析协议数据部分    // limit input stream.    ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);    try {        // 这里调用的是子类DubboCodec        return decodeBody(channel, is, header);    } finally {        if (is.available() > 0) {            try {                if (logger.isWarnEnabled()) {                    logger.warn("Skip input stream " + is.available());                }                StreamUtils.skipUnusedStream(is);            } catch (IOException e) {                logger.warn(e.getMessage(), e);            }        }    }}

精彩评论7

Michae1 发表于 2021-4-20 08:19:11 | 显示全部楼层
Dubbo 的核心源码和原理剖析
小船掀大浪 发表于 2021-4-20 23:53:09 | 显示全部楼层
转发了
逞不完的強 发表于 2021-4-20 15:16:54 | 显示全部楼层
转发了
花山137815319 发表于 2021-4-20 08:37:02 | 显示全部楼层
转发了
花山137815319 发表于 2021-4-20 08:37:00 | 显示全部楼层
转发了
剑指南山1206 发表于 2021-4-19 22:32:25 | 显示全部楼层
转发了
屯田乐乐 发表于 2021-4-19 20:06:53 | 显示全部楼层
转发了
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

猜你喜欢
在线客服邮箱
wxcy#wkgb.net

邮箱地址#换为@

Powered by 创意电子 ©2018-现在 专注资源实战分享源码下载站联盟商城