摘要:分析到这里, doBind() 方法中两个重要方法: initAndRegister() 和 doBind0() 的第一个方法已全部分析完成, initAndRegister() 还是完成了相当多的任务,其核心逻辑总结下:创建 NioServerSocketChannel ,然后进行各种配置初始化,最重要的一步是把 channel 注册到 NioEventLoop 上, NioEventLoop 采用单线程模式轮询事件、处理事件。//创建NioServerSocketChannel -> pipeline添加ServerBootstrapAcceptor -> channel进行register,分配NioEventLoop。

Netty 服务端一般如下面代码模式,简化了 NIO 编程的复杂性同时,并且借助于 Pipeline 模型,可以很简单的就构建出高性能、可扩展的应用程序。

public class DemoServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();

try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new TestServerInitializer());

ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

下面我们就通过源码分析下隐藏在这些代码背后的逻辑,对 Netty 可以有个更加深刻的认识。

NioEventLoopGroup

Netty 中每个 Channel 都会通过注册方式,绑定到一个具体的 NioEventLoop 实例上, NioEventLoop 继承抽象类 SingleThreadEventLoop ,内部通过单个线程模式管理所有注册到它上面的 Channel ,负责这些 Channel 事件监听、事件处理等。 NioEventLoopGroup 内部包含一组 NioEventLoop ,好比 NioEventLoop 是用于管理 Channel 的其中一个线程,而 NioEventLoopGroup 则对应的是管理所有 Channel 的线程池。

上面创建了两个 NioEventLoopGroup 对象,一个是用来管理 NioServerSocketChannel 的,而另一个是用来管理客户端连接进来时创建的客户端对应的 NioSocketChannel 的。

通过跟踪 NioEventLoopGroup 构造过程,本身逻辑是比较简单,但是调用栈比较深,这里就不太方便代码展示,其大概完成事情可以用如下图描述:

NioEventLoopGroup 创建时,同时会创建三个元素: executorchooserchild

  • child
    NioEventLoop
    Group
    NioEventLoop
    CPU核数*2
    NioEventLoop
    
    • executor
      NioEventLoop
      executor
      
    • taskQueue
      NioEventLoop
      NioEventLoop
      taskQueue
      NioEventLoop
      taskQueue
      
    • rejectedHandler
      addTask()
      taskQueue
      
    • Selector
      NioEventLoop
      Selector
      Channel
      Selector
      SelectionKey
      NioEventLoop
      
  • executor
    NioEventLoop
    Channel
    executor
    
  • chooser
    NioEventLoopGroup
    NioEventLoop
    Channel
    chooser
    NioEventLoop
    

ServerBootStrap配置

下面我们来分析下如下代码作用:

ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new TestServerInitializer());

ServerBootStrapNetty 使用的一个启动引导类,上面的代码主要是为后续 Netty 启动提供配置数据,本身比较简单:

  • group(bossGroup, workerGroup)
    Netty
    bossGroup
    NioServerSocketChannel
    OP_ACCEPT
    workerGroup
    NioSocketChannel
    
  • channel(NioServerSocketChannel.class) :用于指定网络模型。
  • childHandler(new TestServerInitializer())
    NioSocketChannel
    channel
    pipeline
    channel
    ChannelInitializer
    handler
    pipeline
    handler
    

bind

当执行到 serverBootstrap.bind(8899) ,则表示 Netty 开始进入真正的启动阶段。一路跟踪下来,会进入到 doBind() 方法中:

private ChannelFuture doBind(final SocketAddress localAddress) {
//创建NioServerSocketChannel -> pipeline添加ServerBootstrapAcceptor -> channel进行register,分配NioEventLoop
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}

/**
* register完成,则执行doBind0()进行server端口绑定
*/

if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
//register还未完成,则添加listener,待注册完成再执行doBind0()进行server端口绑定
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}

这个方法主要完成两件事:

  • initAndRegister()
    Channel
    Channel
    Channel
    NioEventLoop
    NioEventLoop
    
  • doBind0()
    initAndRegister()
    doBind0()
    initAndRegister()
    doBind0()
    regFuture.isDone()
    Channel
    addListener()
    listener
    doBind0()
    channel
    channel
    

initAndRegister

final ChannelFuture initAndRegister() {
Channel channel = null;
try {
/**
*
* 通过反射方式创建IO模型类型,具体类型有serverBootstrap.channel()方法指定,比如:NioServerSocketChannel
* NioServerSocketChannel创建时,构造方法中会触发创建jdk channel创建
* 同时会创建对应的配置类:NioServerSocketChannelConfig(tcp参数配置)
*/

channel = channelFactory.newChannel();
/**
* 初始化channel,由子类bootstrap或者serverBootStrap进行实现,可视为一个模板方法
* ServerBootStrap逻辑:options、attrs等初始化,同时向pipeline中添加一个InboundHandler:ServerBootstrapAcceptor
*
* new ServerBootstrapAcceptor(serverSocketChannel, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)
* 这样NioServerSocketChannel接收到OP_ACCEPT事件时,就可以利用这些参数给代表客户端连接的SocketChannel初始化
*/

init(channel);//
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
//还没有注册到线程池。使用默认线程GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
/**
* 将NioServerSocketChannel注册到Reactor主线程池上 ,即给当前创建的Channel分配一个NioEventLoop线程
*/

ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}

return regFuture;
}

这个方法主要完成3件事:

  • channelFactory.newChannel()
    Channel
    serverBootstrap.channel()
    .channel(NioServerSocketChannel.class)
    
  • init(channel)
    Channel
    options
    attrs
    pipeline
    handler
    ServerBootstrapAcceptor
    
new ServerBootstrapAcceptor(serverSocketChannel, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)

ServerBootstrapAcceptor 连接处理器是 Server 端非常重要的一个 InBound 类型的 handler ,当 NioServerSocketChannel 轮询 OP_ACCEPT 事件接收到客户端连接进来时,客户端连接各种设置等工作就是由这个 Acceptor 连接器完成。

  • config().group().register(channel)
    Channel
    NioEventLoop
    NioEventLoop
    channel
    NioEventLoop
    Selector
    

newChannel() 比较简单,这里就不展开了,核心点主要在于 init(channel)register(channel) 这两个方法。

init

void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}

final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}

ChannelPipeline p = channel.pipeline();

final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}

/**
* 给NIOServerChannel绑定的pipeline添加一个ChannelInitializer
*/

p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}

ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}

这个方法主要完成2件事:

  • NioServerSocketChannel
    option
    attr
    
  • NioServerSocketChannel
    Pipeline
    ChannelInitializer
    ChannelInitializer
    pipeline
    handler
    NioServerSocketChannel
    handler
    ServerBootstrapAcceptor
    

顺便我们来看下通过 pipeline.addLast() 方式向 pipeline 添加 handler 逻辑:

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);

newCtx = newContext(group, filterName(name, handler), handler);

addLast0(newCtx);

//当前Channel还未注册,需要先封装成PendingHandlerAddedTask,并链表方式挂载到Pipeline的pendingHandlerCallbackHead变量下,待后续注册完成后再回调
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}

//注册完成,且当前线程和Channel绑定线程不是同一个,则用Channel的绑定线程执行
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}

pipeline 是一个双向链表,刚创建完成时默认有两个节点: headtail ,如下图:

执行 p.addLast(new ChannelInitializer()) 后是如下图:

如上面代码, pipeline 不是直接将 handler 添加进来,而是封装成 handlerContext 。执行 addLast0(newCtx)handler 对应的 HandlerContext 添加进来后,正常情况下这时需要回调 handler#handlerAdded() 方法。 handler#handlerAdded() 执行是需要在 channel 注册的 NioEventLoop 线程中执行才行,所以有 if (!executor.inEventLoop()) 这个判断。但是,当前是在主线程 main 中,且 channel 因为还没有注册完成,所以当前 channelNioEventLoop 根本就还没有绑定到一起,所以是没法执行的,这里会进入 if (!registered) 流程:将 handlerContext 封装成一个 PendingHandlerAddedTask 实例,先挂载到 pipelinependingHandlerCallbackHead 全局变量下,待后续 channel 注册完成后再来处理 handler#handlerAdded() 。还有个问题,如果添加多个 handlerPendingHandlerAddedTask 有个 next ,可以把它们串成一个链表即可。

register

这样,我们把 init() 方法的主要逻辑基本都分析完成了,现在我们再回过头看下 initAndRegister 方法中另外一个重要逻辑: config().group().register(channel) 。这里的 config().group() 就是获取的是之前传入的用于处理 server 端线程组: EventLoopGroup bossGroup = new NioEventLoopGroup()

NioEventLoopGroup#register() 第一步就是使用 chooser 选取一个其管理的 NioEventLoop ,默认选取策略很简单,就是使用一个递增序列 idx ,然后和数组长度取模即可:

executors[idx.getAndIncrement() & executors.length - 1]

选取好 NioEventLoop 后,调用 NioEventLoop#register(channel) 方法, NioEventLoop#register(channel) 方法又会调用 channleUnsafe 对象的 register 进行处理,并把自己即 NioEventLoop 作为参数传入:

promise.channel().unsafe().register(this, promise);

Channel 创建时同时创建一个 UnSafe 对象,主要用于处理与 java 底层 socket 相关操作。

所以, register() 方法跑了一圈最后还是在 channel 中的 Unsafe#register() 方法中进行处理, NioEventLoopGroup 只是利用 chooser 选取一个 NioEventLoop 作为参数传入到 register() 方法中。

我们来看下 Unsafe#register() 方法做了哪些事情。

1、后面这个 eventLoop 就是将刚才利用 chooser 选取的 NioEventLoop ,通过赋值给 channeleventLoop 字段上,即完成了 channelNioEventLoop 的关联;

AbstractChannel.this.eventLoop = eventLoop;

2、调用 register0() 方法, register0() 方法需要在 NioEventLoop 线程中执行才行,所以这里也使用 if (eventLoop.inEventLoop()) 判断下,当前是主线程 main ,所以会进入到 else 逻辑处理中,把执行逻辑封装成任务提交到 NioEventLoop 的任务队列 taskQueue 中:

if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}

eventLoop.execute() 方法中处理不只是简单将 task 放入到 taskQueue 中,我们来看下其还做了哪些事:

public void execute(Runnable task) {
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
}
if (reject) {
reject();
}
}
}

if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}

这个方法主要完成3件事:

  • addTask(task)
    register0()
    taskQueue
    taskQueue
    
  • startThread()
    NioEventLoop
    channel
    startThread()
    if (!inEventLoop)
    NioEventLoop
    inEventLoop
    false
    NioEventLoop
    selector.select()
    processSelectedKeys()
    runAllTasks()
    
  • wakeup()
    NioEventLoop
    NioEventLoop
    selector.select(timeout)
    wakeup()
    NioEventLoop
    select()
    

eventLoop.execute() 分析完成后, register0() 方法任务已被添加到 taskQueue 中,然后启动 NioEventLoop 线程开始干活,最后通过 wakeup() 唤醒 NioEventLoop 让其去处理 taskQueue 中的任务,所以,这时我们需要再回头看下 register0() 方法。

private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;

pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {//这里实际返回false,channelActive()不会在这里触发
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}

这个方法主要完成3件事:

  • doRegister()
    java api
    channel
    selector
    javaChannel().register(eventLoop().unwrappedSelector(), 0, this)
    
    • channel
      NioEventLoop
      Selector
      
    • SelectionKey=0
      channel
      selector
      OP_ACCEPT
      
    • this
      attachment
      this
      server
      NioServerSocketChannel
      selector
      OP_ACCEPT
      NioServerSocketChannel
      accept()
      
  • pipeline.invokeHandlerAddedIfNeeded()
    pipeline
    handler
    channel
    handler
    PendingHandlerAddedTask
    pipeline.pendingHandlerCallbackHead
    channel
    pendingHandlerCallbackHead
    handler#handlerAdded()
    ChannelInitializer#handlerAdded()
    initChannel()
    handler
    ServerBootstrapAcceptor
    pipeline
    ChannelInitializer
    pipeline
    
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}

ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
  • pipeline.fireChannelRegistered() :回调 handler#channelRegistered() 方法;
  • pipeline.fireChannelActive()
    channel
    if (isActive())
    channel
    isActive()
    true
    

总结

分析到这里, doBind() 方法中两个重要方法: initAndRegister()doBind0() 的第一个方法已全部分析完成, initAndRegister() 还是完成了相当多的任务,其核心逻辑总结下:创建 NioServerSocketChannel ,然后进行各种配置初始化,最重要的一步是把 channel 注册到 NioEventLoop 上, NioEventLoop 采用单线程模式轮询事件、处理事件。 handler 回调方法: handlerAdded()channelRegistered() 也会在上面执行过程中被触发调用。

长按识别关注, 持续输出原创

相关文章