摘要:分析到這裏, doBind() 方法中兩個重要方法: initAndRegister() 和 doBind0() 的第一個方法已全部分析完成, initAndRegister() 還是完成了相當多的任務,其核心邏輯總結下:創建 NioServerSocketChannel ,然後進行各種配置初始化,最重要的一步是把 channel 註冊到 NioEventLoop 上, NioEventLoop 採用單線程模式輪詢事件、處理事件。//創建NioServerSocketChannel -> pipeline添加ServerBootstrapAcceptor -> channel進行register,分配NioEventLoop。

Netty 服務端一般如下面代碼模式,簡化了 NIO 編程的複雜性同時,並且藉助於 Pipeline 模型,可以很簡單的就構建出高性能、可擴展的應用程序。

public class DemoServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new TestServerInitializer());

ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

下面我們就通過源碼分析下隱藏在這些代碼背後的邏輯,對 Netty 可以有個更加深刻的認識。

NioEventLoopGroup

Netty 中每個 Channel 都會通過註冊方式,綁定到一個具體的 NioEventLoop 實例上, NioEventLoop 繼承抽象類 SingleThreadEventLoop ,內部通過單個線程模式管理所有註冊到它上面的 Channel ,負責這些 Channel 事件監聽、事件處理等。 NioEventLoopGroup 內部包含一組 NioEventLoop ,好比 NioEventLoop 是用於管理 Channel 的其中一個線程,而 NioEventLoopGroup 則對應的是管理所有 Channel 的線程池。

上面創建了兩個 NioEventLoopGroup 對象,一個是用來管理 NioServerSocketChannel 的,而另一個是用來管理客戶端連接進來時創建的客戶端對應的 NioSocketChannel 的。

通過跟蹤 NioEventLoopGroup 構造過程,本身邏輯是比較簡單,但是調用棧比較深,這裏就不太方便代碼展示,其大概完成事情可以用如下圖描述:

NioEventLoopGroup 創建時,同時會創建三個元素: executorchooserchild

  • child
    NioEventLoop
    Group
    NioEventLoop
    CPU核數*2
    NioEventLoop
    
    • executor
      NioEventLoop
      executor
      
    • taskQueue
      NioEventLoop
      NioEventLoop
      taskQueue
      NioEventLoop
      taskQueue
      
    • rejectedHandler
      addTask()
      taskQueue
      
    • Selector
      NioEventLoop
      Selector
      Channel
      Selector
      SelectionKey
      NioEventLoop
      
  • executor
    NioEventLoop
    Channel
    executor
    
  • chooser
    NioEventLoopGroup
    NioEventLoop
    Channel
    chooser
    NioEventLoop
    

ServerBootStrap配置

下面我們來分析下如下代碼作用:

ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new TestServerInitializer());

ServerBootStrapNetty 使用的一個啓動引導類,上面的代碼主要是爲後續 Netty 啓動提供配置數據,本身比較簡單:

  • group(bossGroup, workerGroup)
    Netty
    bossGroup
    NioServerSocketChannel
    OP_ACCEPT
    workerGroup
    NioSocketChannel
    
  • channel(NioServerSocketChannel.class) :用於指定網絡模型。
  • childHandler(new TestServerInitializer())
    NioSocketChannel
    channel
    pipeline
    channel
    ChannelInitializer
    handler
    pipeline
    handler
    

bind

當執行到 serverBootstrap.bind(8899) ,則表示 Netty 開始進入真正的啓動階段。一路跟蹤下來,會進入到 doBind() 方法中:

private ChannelFuture doBind(final SocketAddress localAddress) {
//創建NioServerSocketChannel -> pipeline添加ServerBootstrapAcceptor -> channel進行register,分配NioEventLoop
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}

/**
* register完成,則執行doBind0()進行server端口綁定
*/

if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
//register還未完成,則添加listener,待註冊完成再執行doBind0()進行server端口綁定
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}

這個方法主要完成兩件事:

  • initAndRegister()
    Channel
    Channel
    Channel
    NioEventLoop
    NioEventLoop
    
  • doBind0()
    initAndRegister()
    doBind0()
    initAndRegister()
    doBind0()
    regFuture.isDone()
    Channel
    addListener()
    listener
    doBind0()
    channel
    channel
    

initAndRegister

final ChannelFuture initAndRegister() {
Channel channel = null;
try {
/**
*
* 通過反射方式創建IO模型類型,具體類型有serverBootstrap.channel()方法指定,比如:NioServerSocketChannel
* NioServerSocketChannel創建時,構造方法中會觸發創建jdk channel創建
* 同時會創建對應的配置類:NioServerSocketChannelConfig(tcp參數配置)
*/

channel = channelFactory.newChannel();
/**
* 初始化channel,由子類bootstrap或者serverBootStrap進行實現,可視爲一個模板方法
* ServerBootStrap邏輯:options、attrs等初始化,同時向pipeline中添加一個InboundHandler:ServerBootstrapAcceptor
*
* new ServerBootstrapAcceptor(serverSocketChannel, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)
* 這樣NioServerSocketChannel接收到OP_ACCEPT事件時,就可以利用這些參數給代表客戶端連接的SocketChannel初始化
*/

init(channel);//
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
//還沒有註冊到線程池。使用默認線程GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
/**
* 將NioServerSocketChannel註冊到Reactor主線程池上 ,即給當前創建的Channel分配一個NioEventLoop線程
*/

ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}

return regFuture;
}

這個方法主要完成3件事:

  • channelFactory.newChannel()
    Channel
    serverBootstrap.channel()
    .channel(NioServerSocketChannel.class)
    
  • init(channel)
    Channel
    options
    attrs
    pipeline
    handler
    ServerBootstrapAcceptor
    
new ServerBootstrapAcceptor(serverSocketChannel, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)

ServerBootstrapAcceptor 連接處理器是 Server 端非常重要的一個 InBound 類型的 handler ,當 NioServerSocketChannel 輪詢 OP_ACCEPT 事件接收到客戶端連接進來時,客戶端連接各種設置等工作就是由這個 Acceptor 連接器完成。

  • config().group().register(channel)
    Channel
    NioEventLoop
    NioEventLoop
    channel
    NioEventLoop
    Selector
    

newChannel() 比較簡單,這裏就不展開了,核心點主要在於 init(channel)register(channel) 這兩個方法。

init

void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}

final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}

ChannelPipeline p = channel.pipeline();

final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}

/**
* 給NIOServerChannel綁定的pipeline添加一個ChannelInitializer
*/

p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}

ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}

這個方法主要完成2件事:

  • NioServerSocketChannel
    option
    attr
    
  • NioServerSocketChannel
    Pipeline
    ChannelInitializer
    ChannelInitializer
    pipeline
    handler
    NioServerSocketChannel
    handler
    ServerBootstrapAcceptor
    

順便我們來看下通過 pipeline.addLast() 方式向 pipeline 添加 handler 邏輯:

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);

newCtx = newContext(group, filterName(name, handler), handler);

addLast0(newCtx);

//當前Channel還未註冊,需要先封裝成PendingHandlerAddedTask,並鏈表方式掛載到Pipeline的pendingHandlerCallbackHead變量下,待後續註冊完成後再回調
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}

//註冊完成,且當前線程和Channel綁定線程不是同一個,則用Channel的綁定線程執行
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}

pipeline 是一個雙向鏈表,剛創建完成時默認有兩個節點: headtail ,如下圖:

執行 p.addLast(new ChannelInitializer()) 後是如下圖:

如上面代碼, pipeline 不是直接將 handler 添加進來,而是封裝成 handlerContext 。執行 addLast0(newCtx)handler 對應的 HandlerContext 添加進來後,正常情況下這時需要回調 handler#handlerAdded() 方法。 handler#handlerAdded() 執行是需要在 channel 註冊的 NioEventLoop 線程中執行纔行,所以有 if (!executor.inEventLoop()) 這個判斷。但是,當前是在主線程 main 中,且 channel 因爲還沒有註冊完成,所以當前 channelNioEventLoop 根本就還沒有綁定到一起,所以是沒法執行的,這裏會進入 if (!registered) 流程:將 handlerContext 封裝成一個 PendingHandlerAddedTask 實例,先掛載到 pipelinependingHandlerCallbackHead 全局變量下,待後續 channel 註冊完成後再來處理 handler#handlerAdded() 。還有個問題,如果添加多個 handlerPendingHandlerAddedTask 有個 next ,可以把它們串成一個鏈表即可。

register

這樣,我們把 init() 方法的主要邏輯基本都分析完成了,現在我們再回過頭看下 initAndRegister 方法中另外一個重要邏輯: config().group().register(channel) 。這裏的 config().group() 就是獲取的是之前傳入的用於處理 server 端線程組: EventLoopGroup bossGroup = new NioEventLoopGroup()

NioEventLoopGroup#register() 第一步就是使用 chooser 選取一個其管理的 NioEventLoop ,默認選取策略很簡單,就是使用一個遞增序列 idx ,然後和數組長度取模即可:

executors[idx.getAndIncrement() & executors.length - 1]

選取好 NioEventLoop 後,調用 NioEventLoop#register(channel) 方法, NioEventLoop#register(channel) 方法又會調用 channleUnsafe 對象的 register 進行處理,並把自己即 NioEventLoop 作爲參數傳入:

promise.channel().unsafe().register(this, promise);

Channel 創建時同時創建一個 UnSafe 對象,主要用於處理與 java 底層 socket 相關操作。

所以, register() 方法跑了一圈最後還是在 channel 中的 Unsafe#register() 方法中進行處理, NioEventLoopGroup 只是利用 chooser 選取一個 NioEventLoop 作爲參數傳入到 register() 方法中。

我們來看下 Unsafe#register() 方法做了哪些事情。

1、後面這個 eventLoop 就是將剛纔利用 chooser 選取的 NioEventLoop ,通過賦值給 channeleventLoop 字段上,即完成了 channelNioEventLoop 的關聯;

AbstractChannel.this.eventLoop = eventLoop;

2、調用 register0() 方法, register0() 方法需要在 NioEventLoop 線程中執行纔行,所以這裏也使用 if (eventLoop.inEventLoop()) 判斷下,當前是主線程 main ,所以會進入到 else 邏輯處理中,把執行邏輯封裝成任務提交到 NioEventLoop 的任務隊列 taskQueue 中:

if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}

eventLoop.execute() 方法中處理不只是簡單將 task 放入到 taskQueue 中,我們來看下其還做了哪些事:

public void execute(Runnable task) {
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
}
if (reject) {
reject();
}
}
}

if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}

這個方法主要完成3件事:

  • addTask(task)
    register0()
    taskQueue
    taskQueue
    
  • startThread()
    NioEventLoop
    channel
    startThread()
    if (!inEventLoop)
    NioEventLoop
    inEventLoop
    false
    NioEventLoop
    selector.select()
    processSelectedKeys()
    runAllTasks()
    
  • wakeup()
    NioEventLoop
    NioEventLoop
    selector.select(timeout)
    wakeup()
    NioEventLoop
    select()
    

eventLoop.execute() 分析完成後, register0() 方法任務已被添加到 taskQueue 中,然後啓動 NioEventLoop 線程開始幹活,最後通過 wakeup() 喚醒 NioEventLoop 讓其去處理 taskQueue 中的任務,所以,這時我們需要再回頭看下 register0() 方法。

private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;

pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {//這裏實際返回false,channelActive()不會在這裏觸發
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}

這個方法主要完成3件事:

  • doRegister()
    java api
    channel
    selector
    javaChannel().register(eventLoop().unwrappedSelector(), 0, this)
    
    • channel
      NioEventLoop
      Selector
      
    • SelectionKey=0
      channel
      selector
      OP_ACCEPT
      
    • this
      attachment
      this
      server
      NioServerSocketChannel
      selector
      OP_ACCEPT
      NioServerSocketChannel
      accept()
      
  • pipeline.invokeHandlerAddedIfNeeded()
    pipeline
    handler
    channel
    handler
    PendingHandlerAddedTask
    pipeline.pendingHandlerCallbackHead
    channel
    pendingHandlerCallbackHead
    handler#handlerAdded()
    ChannelInitializer#handlerAdded()
    initChannel()
    handler
    ServerBootstrapAcceptor
    pipeline
    ChannelInitializer
    pipeline
    
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}

ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
  • pipeline.fireChannelRegistered() :回調 handler#channelRegistered() 方法;
  • pipeline.fireChannelActive()
    channel
    if (isActive())
    channel
    isActive()
    true
    

總結

分析到這裏, doBind() 方法中兩個重要方法: initAndRegister()doBind0() 的第一個方法已全部分析完成, initAndRegister() 還是完成了相當多的任務,其核心邏輯總結下:創建 NioServerSocketChannel ,然後進行各種配置初始化,最重要的一步是把 channel 註冊到 NioEventLoop 上, NioEventLoop 採用單線程模式輪詢事件、處理事件。 handler 回調方法: handlerAdded()channelRegistered() 也會在上面執行過程中被觸發調用。

長按識別關注, 持續輸出原創

相關文章