Netty 源碼分析(一):Netty Server 啓動流程(上)
摘要:分析到這裏, doBind() 方法中兩個重要方法: initAndRegister() 和 doBind0() 的第一個方法已全部分析完成, initAndRegister() 還是完成了相當多的任務,其核心邏輯總結下:創建 NioServerSocketChannel ,然後進行各種配置初始化,最重要的一步是把 channel 註冊到 NioEventLoop 上, NioEventLoop 採用單線程模式輪詢事件、處理事件。//創建NioServerSocketChannel -> pipeline添加ServerBootstrapAcceptor -> channel進行register,分配NioEventLoop。
Netty
服務端一般如下面代碼模式,簡化了 NIO
編程的複雜性同時,並且藉助於 Pipeline
模型,可以很簡單的就構建出高性能、可擴展的應用程序。
public class DemoServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new TestServerInitializer());
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
下面我們就通過源碼分析下隱藏在這些代碼背後的邏輯,對 Netty
可以有個更加深刻的認識。
NioEventLoopGroup
Netty
中每個 Channel
都會通過註冊方式,綁定到一個具體的 NioEventLoop
實例上, NioEventLoop
繼承抽象類 SingleThreadEventLoop
,內部通過單個線程模式管理所有註冊到它上面的 Channel
,負責這些 Channel
事件監聽、事件處理等。 NioEventLoopGroup
內部包含一組 NioEventLoop
,好比 NioEventLoop
是用於管理 Channel
的其中一個線程,而 NioEventLoopGroup
則對應的是管理所有 Channel
的線程池。
上面創建了兩個 NioEventLoopGroup
對象,一個是用來管理 NioServerSocketChannel
的,而另一個是用來管理客戶端連接進來時創建的客戶端對應的 NioSocketChannel
的。
通過跟蹤 NioEventLoopGroup
構造過程,本身邏輯是比較簡單,但是調用棧比較深,這裏就不太方便代碼展示,其大概完成事情可以用如下圖描述:
NioEventLoopGroup
創建時,同時會創建三個元素: executor
、 chooser
和 child
:
-
child NioEventLoop Group NioEventLoop CPU核數*2 NioEventLoop
-
executor NioEventLoop executor
-
taskQueue NioEventLoop NioEventLoop taskQueue NioEventLoop taskQueue
-
rejectedHandler addTask() taskQueue
-
Selector NioEventLoop Selector Channel Selector SelectionKey NioEventLoop
-
executor NioEventLoop Channel executor
-
chooser NioEventLoopGroup NioEventLoop Channel chooser NioEventLoop
ServerBootStrap配置
下面我們來分析下如下代碼作用:
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new TestServerInitializer());
ServerBootStrap
是 Netty
使用的一個啓動引導類,上面的代碼主要是爲後續 Netty
啓動提供配置數據,本身比較簡單:
-
group(bossGroup, workerGroup) Netty bossGroup NioServerSocketChannel OP_ACCEPT workerGroup NioSocketChannel
-
channel(NioServerSocketChannel.class)
:用於指定網絡模型。 -
childHandler(new TestServerInitializer()) NioSocketChannel channel pipeline channel ChannelInitializer handler pipeline handler
bind
當執行到 serverBootstrap.bind(8899)
,則表示 Netty
開始進入真正的啓動階段。一路跟蹤下來,會進入到 doBind()
方法中:
private ChannelFuture doBind(final SocketAddress localAddress) {
//創建NioServerSocketChannel -> pipeline添加ServerBootstrapAcceptor -> channel進行register,分配NioEventLoop
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
/**
* register完成,則執行doBind0()進行server端口綁定
*/
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
//register還未完成,則添加listener,待註冊完成再執行doBind0()進行server端口綁定
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
這個方法主要完成兩件事:
-
initAndRegister() Channel Channel Channel NioEventLoop NioEventLoop
-
doBind0() initAndRegister() doBind0() initAndRegister() doBind0() regFuture.isDone() Channel addListener() listener doBind0() channel channel
initAndRegister
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
/**
*
* 通過反射方式創建IO模型類型,具體類型有serverBootstrap.channel()方法指定,比如:NioServerSocketChannel
* NioServerSocketChannel創建時,構造方法中會觸發創建jdk channel創建
* 同時會創建對應的配置類:NioServerSocketChannelConfig(tcp參數配置)
*/
channel = channelFactory.newChannel();
/**
* 初始化channel,由子類bootstrap或者serverBootStrap進行實現,可視爲一個模板方法
* ServerBootStrap邏輯:options、attrs等初始化,同時向pipeline中添加一個InboundHandler:ServerBootstrapAcceptor
*
* new ServerBootstrapAcceptor(serverSocketChannel, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)
* 這樣NioServerSocketChannel接收到OP_ACCEPT事件時,就可以利用這些參數給代表客戶端連接的SocketChannel初始化
*/
init(channel);//
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
//還沒有註冊到線程池。使用默認線程GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
/**
* 將NioServerSocketChannel註冊到Reactor主線程池上 ,即給當前創建的Channel分配一個NioEventLoop線程
*/
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
這個方法主要完成3件事:
-
channelFactory.newChannel() Channel serverBootstrap.channel() .channel(NioServerSocketChannel.class)
-
init(channel) Channel options attrs pipeline handler ServerBootstrapAcceptor
new ServerBootstrapAcceptor(serverSocketChannel, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)
ServerBootstrapAcceptor
連接處理器是 Server
端非常重要的一個 InBound
類型的 handler
,當 NioServerSocketChannel
輪詢 OP_ACCEPT
事件接收到客戶端連接進來時,客戶端連接各種設置等工作就是由這個 Acceptor
連接器完成。
-
config().group().register(channel) Channel NioEventLoop NioEventLoop channel NioEventLoop Selector
newChannel()
比較簡單,這裏就不展開了,核心點主要在於 init(channel)
和 register(channel)
這兩個方法。
init
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
/**
* 給NIOServerChannel綁定的pipeline添加一個ChannelInitializer
*/
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
這個方法主要完成2件事:
-
NioServerSocketChannel option attr
-
NioServerSocketChannel Pipeline ChannelInitializer ChannelInitializer pipeline handler NioServerSocketChannel handler ServerBootstrapAcceptor
順便我們來看下通過 pipeline.addLast()
方式向 pipeline
添加 handler
邏輯:
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
//當前Channel還未註冊,需要先封裝成PendingHandlerAddedTask,並鏈表方式掛載到Pipeline的pendingHandlerCallbackHead變量下,待後續註冊完成後再回調
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
//註冊完成,且當前線程和Channel綁定線程不是同一個,則用Channel的綁定線程執行
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
pipeline
是一個雙向鏈表,剛創建完成時默認有兩個節點: head
和 tail
,如下圖:
執行 p.addLast(new ChannelInitializer())
後是如下圖:
如上面代碼, pipeline
不是直接將 handler
添加進來,而是封裝成 handlerContext
。執行 addLast0(newCtx)
將 handler
對應的 HandlerContext
添加進來後,正常情況下這時需要回調 handler#handlerAdded()
方法。 handler#handlerAdded()
執行是需要在 channel
註冊的 NioEventLoop
線程中執行纔行,所以有 if (!executor.inEventLoop())
這個判斷。但是,當前是在主線程 main
中,且 channel
因爲還沒有註冊完成,所以當前 channel
和 NioEventLoop
根本就還沒有綁定到一起,所以是沒法執行的,這裏會進入 if (!registered)
流程:將 handlerContext
封裝成一個 PendingHandlerAddedTask
實例,先掛載到 pipeline
的 pendingHandlerCallbackHead
全局變量下,待後續 channel
註冊完成後再來處理 handler#handlerAdded()
。還有個問題,如果添加多個 handler
, PendingHandlerAddedTask
有個 next
,可以把它們串成一個鏈表即可。
register
這樣,我們把 init()
方法的主要邏輯基本都分析完成了,現在我們再回過頭看下 initAndRegister
方法中另外一個重要邏輯: config().group().register(channel)
。這裏的 config().group()
就是獲取的是之前傳入的用於處理 server
端線程組: EventLoopGroup bossGroup = new NioEventLoopGroup()
。
NioEventLoopGroup#register()
第一步就是使用 chooser
選取一個其管理的 NioEventLoop
,默認選取策略很簡單,就是使用一個遞增序列 idx
,然後和數組長度取模即可:
executors[idx.getAndIncrement() & executors.length - 1]
選取好 NioEventLoop
後,調用 NioEventLoop#register(channel)
方法, NioEventLoop#register(channel)
方法又會調用 channle
的 Unsafe
對象的 register
進行處理,並把自己即 NioEventLoop
作爲參數傳入:
promise.channel().unsafe().register(this, promise);
Channel
創建時同時創建一個 UnSafe
對象,主要用於處理與 java
底層 socket
相關操作。
所以, register()
方法跑了一圈最後還是在 channel
中的 Unsafe#register()
方法中進行處理, NioEventLoopGroup
只是利用 chooser
選取一個 NioEventLoop
作爲參數傳入到 register()
方法中。
我們來看下 Unsafe#register()
方法做了哪些事情。
1、後面這個 eventLoop
就是將剛纔利用 chooser
選取的 NioEventLoop
,通過賦值給 channel
的 eventLoop
字段上,即完成了 channel
和 NioEventLoop
的關聯;
AbstractChannel.this.eventLoop = eventLoop;
2、調用 register0()
方法, register0()
方法需要在 NioEventLoop
線程中執行纔行,所以這裏也使用 if (eventLoop.inEventLoop())
判斷下,當前是主線程 main
,所以會進入到 else
邏輯處理中,把執行邏輯封裝成任務提交到 NioEventLoop
的任務隊列 taskQueue
中:
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
eventLoop.execute()
方法中處理不只是簡單將 task
放入到 taskQueue
中,我們來看下其還做了哪些事:
public void execute(Runnable task) {
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
這個方法主要完成3件事:
-
addTask(task) register0() taskQueue taskQueue
-
startThread() NioEventLoop channel startThread() if (!inEventLoop) NioEventLoop inEventLoop false NioEventLoop selector.select() processSelectedKeys() runAllTasks()
-
wakeup() NioEventLoop NioEventLoop selector.select(timeout) wakeup() NioEventLoop select()
eventLoop.execute()
分析完成後, register0()
方法任務已被添加到 taskQueue
中,然後啓動 NioEventLoop
線程開始幹活,最後通過 wakeup()
喚醒 NioEventLoop
讓其去處理 taskQueue
中的任務,所以,這時我們需要再回頭看下 register0()
方法。
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {//這裏實際返回false,channelActive()不會在這裏觸發
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
這個方法主要完成3件事:
-
doRegister() java api channel selector javaChannel().register(eventLoop().unwrappedSelector(), 0, this)
-
channel NioEventLoop Selector
-
SelectionKey=0 channel selector OP_ACCEPT
-
this attachment this server NioServerSocketChannel selector OP_ACCEPT NioServerSocketChannel accept()
-
pipeline.invokeHandlerAddedIfNeeded() pipeline handler channel handler PendingHandlerAddedTask pipeline.pendingHandlerCallbackHead channel pendingHandlerCallbackHead handler#handlerAdded() ChannelInitializer#handlerAdded() initChannel() handler ServerBootstrapAcceptor pipeline ChannelInitializer pipeline
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
-
pipeline.fireChannelRegistered()
:回調handler#channelRegistered()
方法; -
pipeline.fireChannelActive() channel if (isActive()) channel isActive() true
總結
分析到這裏, doBind()
方法中兩個重要方法: initAndRegister()
和 doBind0()
的第一個方法已全部分析完成, initAndRegister()
還是完成了相當多的任務,其核心邏輯總結下:創建 NioServerSocketChannel
,然後進行各種配置初始化,最重要的一步是把 channel
註冊到 NioEventLoop
上, NioEventLoop
採用單線程模式輪詢事件、處理事件。 handler
回調方法: handlerAdded()
和 channelRegistered()
也會在上面執行過程中被觸發調用。
長按識別關注, 持續輸出原創