戳藍字「TopCoder 」關注我們哦!

Netty支持多種服務端的server實例,包括mina、netty等,如下所示:

由於開發者目前使用dubbo幾乎都是基於 Netty4 的,因此下面的分析就以netty4的NettyServer爲例,dubbo啓動過程中會調用  NettyServer#doOpen 初始化和啓動netty server。這裏主要操作就是初始化 bossGroup 和 workerGroup,然後進行bind、設置channelHandler,一個標準的netty初始化啓動流程,具體代碼如下:

protected void doOpen() throws Throwable {
    bootstrap = new ServerBootstrap();

    bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
    workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
            new DefaultThreadFactory("NettyServerWorker", true));

    final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
    // the cache for alive worker channel
    channels = nettyServerHandler.getChannels();

    bootstrap.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
            .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
            .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
            .childHandler(new ChannelInitializer<NioSocketChannel>() {
                @Override
                protected void initChannel(NioSocketChannel ch) throws Exception {
                    int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                    if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
                        ch.pipeline().addLast("negotiation",
                                SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
                    }
                    ch.pipeline()
                            .addLast("decoder", adapter.getDecoder())
                            .addLast("encoder", adapter.getEncoder())
                            // 空閒心跳檢測
                            .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                            .addLast("handler", nettyServerHandler);
                }
            });
    // bind
    ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
    channelFuture.syncUninterruptibly();
    channel = channelFuture.channel();
}

dubbo啓動netty server時,會創建心跳檢查的ChannelHandler,從IdleStateHandler的實現來看,它提供針對了 讀空閒檢測readerIdleTime、寫空閒檢測writerIdleTime和讀寫空閒檢測allIdleTime的能力,當 readerIdleTime、writerIdleTime或者allIdleTime 大於0時,會在channelActive時初始化對應的netty的延時任務。

public IdleStateHandler( long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
    this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
}

當任務到期執行時,會檢查上次的讀寫時間戳是否大於設定的最大空閒時間,如果大於則發送 IdleStateEvent 事件,這時就會觸發用戶設定的 ChannelHandlerfireUserEventTriggered 回調,針對上述代碼來說,就會走到dubbo中 org.apache.dubbo.remoting.transport.netty4.NettyServerHandler#userEventTriggered 方法中:

public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    // server will close channel when server don't receive any heartbeat from client util timeout.
    if (evt instanceof IdleStateEvent) {
        NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
        try {
            logger.info("IdleStateEvent triggered, close channel " + channel);
            channel.close();
        } finally {
            NettyChannel.removeChannelIfDisconnected(ctx.channel());
        }
    }
    super.userEventTriggered(ctx, evt);
}

默認的心跳超時時間是心跳間隔的3倍,從實現來看,如果心跳超時了,dubbo provider端會主動斷開連接,這說明comsumer端可能已經掛了或者重啓了。

從上述dubbo啓動netty的初始化代碼來看,當consumer發出的請求達到provider時,首先要經過解碼器InternalDecoder,注意這個解碼器只是簡單的轉發作用,實際上解碼工作是靠具體協議對應的解碼器的,比如針對dubbo協議來說就是DubboCountCodec。

注意:dubbo provider端的解碼流程不是本文的關注重點,因此大家只需知道其流程即可,關於編解碼這塊後續我會寫專門的文章來分析。

consumer的請求數據經過解碼之後就到達了dubbo業務處理的 ChannelHandler — NettyServerHandler

public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
    // 傳遞給dubbo處理器
    handler.received(channel, msg);
}

關於dubbo中處理各種IO事件,和netty中處理類似也定義了一套處理回調接口,定義如下:

public interface ChannelHandler {
    void connected(Channel channel) throws RemotingException;
    void disconnected(Channel channel) throws RemotingException;
    void sent(Channel channel, Object message) throws RemotingException;
    void received(Channel channel, Object message) throws RemotingException;
    void caught(Channel channel, Throwable exception) throws RemotingException;
}

傳遞給dubbo處理器,會走到MultiMessageHandler處理器,由於dubbo定義的各種處理器實際上就是責任鏈的體現,爲了方便看流程,先看下大致的處理涉及的類圖:

  • MultiMessageHandler:提供了針對多請求的處理能力;

  • HeartbeatHandler:是針對心跳請求的處理邏輯,如果是心跳請求,則更新心跳時間戳,然後直接返回,這時是不會傳遞個接下來的處理器的;

  • AllChannelHandler:all線程模型的實現,這是dubbo provider端默認的線程模型,這種線程模型把所有事件都直接交給業務線程池進行處理了。

注意:dubbo的provider線程池模型不是本文關注的重點,因此大家理解節課,後續dubbo provider線程池模型這塊後續我會寫專門的文章來分析。

將請求數據傳遞給dubbo provider端的線程池來處理之後,接下來就是dubbo真正的業務處理流程了。也到了本文該結束的時刻了,關於dubbo provider後續的處理流程解析,歡迎大家看接下來的文章哈。

相關文章