4477 字
22 分钟
Netty4.2 - Server 启动源码跟踪

从 Netty4.2 源码中的 EchoServer 案例开始阅读,代码在 io.netty.example.echo.EchoServer

关键代码#

ServerBootstrap.bind()
- doBind()
- initAndRegister()
- init(channel)
- p.addLast(new ChannelInitializer)
- config().group().register(channel)
- promise.channel().unsafe().register(this, promise)
- doRegister()
- ((IoEventLoop) eventLoop()).register((AbstractNioUnsafe) unsafe())
- register0()
- pipeline.invokeHandlerAddedIfNeeded()
- doBind0()
- channel.bind(localAddress, promise)
- pipeline.fireChannelActive()
- readIfIsAutoRead()

ServerBootstrap 启动流程#

我们从 ServerBootstrap.doBind() 入口开始,在这一层主要干了两件事情

  • initAndRegister() # 初始化 Channel 和注册 eventLoop
  • doBind0() # 对 Channel 进行绑定端口
private ChannelFuture doBind(final SocketAddress localAddress) {
// 异步初始化和注册 Channel
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
// 确保 Channel 注册成功后执行 doBind0 方法对 Channel 进行绑定端口
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}

initAndRegister()#

正如其名,这个方法也干了两件事

  • init(channel) # 初始化 Channel
  • config().group().register(channel) # 用户配置的 EventLoop 关联到 Channel
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// config().group() 用户配置的 bossGroup 的 eventLoop 关联到 channel
final ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}

init(channel) 方法给 Channel 设置了一些 option 和 attribute 属性,然后向 ChannelPipeline 添加了一个 ChannelInitializer 的 Handler

ChannelInitializer

一种特殊的 Handler,主要作用是初始化 Handler,ChannelInitializer 的 initChannel 方法执行完后会把自己从 ChannelPipeline 中移除

在这里 ChannelInitializer 的初始化主要做了两件事

  • 向 ChannelPipeline 添加用户设置的 Handler
  • 执行一个 EventLoop 异步任务,向 ChannelPipeline 添加 ServerBootstrapAcceptor(添加用户设置的 ChildHandler)

使用 EventLoop 执行异步任务是因为必须等 config().group().register(channel) 方法执行,EventLoop 初始化完成后才能执行

ServerBootstrapAcceptor

也是一种特殊的 Handler,当 server 接收到客户端的连接请求,通过 ServerBootstrapAcceptor,向 ChannelPipeline 添加用户设置的 ChildHandler

@Override
void init(Channel channel) {
// 设置一些 option 和 attribute 属性
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, newAttributesArray());
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);
final Collection<ChannelInitializerExtension> extensions = getInitializerExtensions();
// 添加 ChannelInitializer 初始化 Handler,在 EventLoop 执行时添加 ServerBootstrapAcceptor
// ServerBootstrapAcceptor 用于服务端接收新连接然后添加 ChildHandler 的 Handler 前置处理
// 通过 PendingHandlerCallback 持有 AbstractChannelHandlerContext 对象进行懒加载(eventLoop 关联到 channel 后)
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
// 添加用户设置的 Handler
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// EventLoop 执行任务时,异步添加 ServerBootstrapAcceptor
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs,
extensions));
}
});
}
});
// ...
}

config().group().register(channel) 方法作用是向用户配置的 EventLoopGroup 里的 EventLoop 关联 Channel 等注册逻辑,跟踪 register()方法

@Override
public ChannelFuture register(Channel channel) {
// next() 是 SingleThreadEventLoop,所以 register() 方法都在 EventLoop 线程中执行
return next().register(channel);
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}

通过 SingleThreadEventLoop 线程调用 promise.channel().unsafe().register(this, promise),使用 Channel 里的 unsafe 进行注册

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
// 关联 EventLoop 到 Channel
AbstractChannel.this.eventLoop = eventLoop;
// Clear any cached executors from prior event loop registrations.
AbstractChannelHandlerContext context = pipeline.tail;
do {
context.contextExecutor = null;
context = context.prev;
} while (context != null);
// 必须在 eventLoop 线程中执行 register0 方法
// register0 关联(Selector 注册)、状态设置、事件触发、promise 等
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}

这段代码很明显,register0(promise) 是注册的核心处理方法

private void register0(ChannelPromise promise) {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
ChannelPromise registerPromise = newPromise();
boolean firstRegistration = neverRegistered;
// doRegister 注册完成后执行,向 ChannelPipeline 添加 Handler
registerPromise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
// 执行 ChannelInitializer 的 config.handler() 添加用户配置的 handler,最后删除当前 ChannelInitializer
pipeline.invokeHandlerAddedIfNeeded();
// 设置状态,通知 promise 的 Listener
safeSetSuccess(promise);
// 递归调用 ChannelHandlerContext 所有的 channelRegistered 方法(事件传递)
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to
// begin read again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} else {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, future.cause());
}
}
});
// Channel 关联到 selector,关联各种组件
doRegister(registerPromise);
}

在以往的 Netty 版本源码中,是先调用 doRegister() 方法,然后再往 ChannelPipeline 添加 Handler

Netty4.2 把添加 Handler 的逻辑重构成回调的形式,放到 registerPromise 的 Listener 中,在 doRegister(registerPromise) 执行完成后再回调执行,所以我们先看 doRegister() 方法做了什么

doRegister()#

@SuppressWarnings("unchecked")
@Override
protected void doRegister(ChannelPromise promise) {
assert registration == null;
// IoHandle 是 (AbstractNioUnsafe) unsafe()
// IoHandle 获取 Channel 关联到 selector 返回 SelectionKey
// 通过 IoRegistration 关联 IoHandle 和 SelectionKey,返回 IoRegistration 对象
((IoEventLoop) eventLoop()).register((AbstractNioUnsafe) unsafe()).addListener(f -> {
if (f.isSuccess()) {
// Channel 在回调中关联 IoRegistration
registration = (IoRegistration) f.getNow();
// 设置 registerPromise 状态,通知监听器
promise.setSuccess();
} else {
promise.setFailure(f.cause());
}
});
}

这一段代码是 Netty4.2 中主要 重构的部分,涉及 IoEventLoop、IoHandler、IoHandle、IoRegistration 等核心组件的关联关系

继续深入跟踪 IoEventLoop.register() 方法,调用链有点长

@Override
public final Future<IoRegistration> register(final IoHandle handle) {
Promise<IoRegistration> promise = newPromise();
if (inEventLoop()) {
registerForIo0(handle, promise);
} else {
execute(() -> registerForIo0(handle, promise));
}
return promise;
}
private void registerForIo0(final IoHandle handle, Promise<IoRegistration> promise) {
assert inEventLoop();
final IoRegistration registration;
try {
// 通过 ioHandler 返回 IoRegistration
registration = ioHandler.register(handle);
} catch (Exception e) {
promise.setFailure(e);
return;
}
numRegistrations.incrementAndGet();
// 设置 promise 状态,回调里返回 IoRegistration
promise.setSuccess(new IoRegistrationWrapper(registration));
}
@Override
public IoRegistration register(IoHandle handle)
throws Exception {
NioIoHandle nioHandle = nioHandle(handle);
NioIoOps ops = NioIoOps.NONE;
boolean selected = false;
for (;;) {
try {
// 创建 IORegistration
return new DefaultNioRegistration(executor, nioHandle, ops, unwrappedSelector());
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
final class DefaultNioRegistration implements IoRegistration {
private final NioIoHandle handle;
private volatile SelectionKey key;
DefaultNioRegistration(ThreadAwareExecutor executor, NioIoHandle handle, NioIoOps initialOps, Selector selector)
throws IOException {
this.handle = handle;
// IoHandle 获取 channel 关联到 selector,返回 SelectionKey
// 注意最后一个参数 this,把自己当参数传进去
key = handle.selectableChannel().register(selector, initialOps.value, this);
}
}
public final SelectionKey register(Selector sel, int ops, Object att)
throws ClosedChannelException
{
if ((ops & ~validOps()) != 0)
throw new IllegalArgumentException();
if (!isOpen())
throw new ClosedChannelException();
synchronized (regLock) {
if (isBlocking())
throw new IllegalBlockingModeException();
synchronized (keyLock) {
// re-check if channel has been closed
if (!isOpen())
throw new ClosedChannelException();
// 判断是否重复注册
SelectionKey k = findKey(sel);
if (k != null) {
k.attach(att);
k.interestOps(ops);
} else {
// New registration
// 向 selector 注册操作数,返回一个 key
// att 是 IoRegistration,会绑定到 key 的 attach 里
k = ((AbstractSelector)sel).register(this, ops, att);
// 把 key 添加到 Channel 的 SelectionKey[] 中
addKey(k);
}
return k;
}
}
}

DefaultNioRegistration 的构造函数中,关联了 IoHandle,并且通过 IoHandle 获取 Channel 调用 它的 register() 方法向 JDK 的 selector 注册一个操作数,返回 SelectionKey 到 DefaultNioRegistration 的成员变量中

最终在 doRegister() 的 ((IoEventLoop) eventLoop()).register((AbstractNioUnsafe) unsafe()).addListener 回调中,把 Channel 和 IoRegistration 进行关联

unsafe

是 Channel 的内部接口,在具体的 Channel 中实现执行真正的 I/O 读写、连接、注册等底层操作,正如其名一般不暴露给用户

所以 Channel 和 selector 的关系,是通过互相持有 selector 的 key 进行绑定的,但 Netty4.2 中 通过 IoRegistration 间接持有 selector 的 key 进行了解耦

((IoEventLoop) eventLoop()).register((AbstractNioUnsafe) unsafe()).addListener(f -> {
if (f.isSuccess()) {
// Channel 在回调中关联 IoRegistration
registration = (IoRegistration) f.getNow();
// 设置 registerPromise 状态,通知 Listener
promise.setSuccess();
} else {
promise.setFailure(f.cause());
}
});

然后设置 registerPromise 的状态通知 Listener 执行,回到前面 register0() 方法,跟踪一下 Listener 里添加 Handler 的逻辑

registerPromise#

registerPromise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// ...
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
// 执行 ChannelInitializer 的 config.handler() 添加用户配置的 handler,最后删除当前 ChannelInitializer
pipeline.invokeHandlerAddedIfNeeded();
// 设置状态,通知 promise 的 Listener
safeSetSuccess(promise);
// 递归调用 ChannelHandlerContext 所有的 channelRegistered 方法(事件传递)
pipeline.fireChannelRegistered();
// ...
}
});

先看 pipeline.invokeHandlerAddedIfNeeded() 方法

这个方法就是向 pipeline 添加 Handler,调用链比较长,我们只看核心的处理方法,重点关注 handlerAdded() 方法,因为 ChannelInitializer 是第一个 Handler,所以执行的是 ChannelInitializer 的 handlerAdded() 方法

  • pipeline.invokeHandlerAddedIfNeeded()
    • callHandlerAddedForAllHandlers()
      • task.execute()
        • callHandlerAdded0(ctx)
          • ctx.callHandlerAdded()
            • handler().handlerAdded(this)
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
// This should always be true with our current DefaultChannelPipeline implementation.
// The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
// surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
// will be added in the expected order.
if (initChannel(ctx)) {
// We are done with init the Channel, removing the initializer now.
// 源码注解已经说明,初始化后,移除 ChannelInitializer
// 这里移除的是 ChannelInitializer 里 initMap 的 ctx
removeState(ctx);
}
}
}
@SuppressWarnings("unchecked")
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
// 执行 ChannelInitializer 的 initChannel 方法
initChannel((C) ctx.channel());
} catch (Throwable cause) {
// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
// We do so to prevent multiple calls to initChannel(...).
exceptionCaught(ctx, cause);
} finally {
if (!ctx.isRemoved()) {
// 移除当前 ChannelInitializer
ctx.pipeline().remove(this);
}
}
return true;
}
return false;
}

可以看到 initChannel() 方法执行完,会把 ChannelInitializer 从 ChannelPipeline 中移除,我们回顾一下 initChannel() 方法里做了什么

  • 向 ChannelPipeline 添加用户设置的 Handler
  • 执行一个 EventLoop 异步任务,向 ChannelPipeline 添加 ServerBootstrapAcceptor(添加用户设置的 ChildHandler)

继续往下看 pipeline.fireChannelRegistered() 方法,这个方法作用是通过 HeadContext 从头到尾递归 HandlerContext 传递 register 入站事件,显然是通过责任链模式实现的,这种通知算是支线,就不详细跟踪了

到此 ChannelPipeline 里 Handler 的初始化完成


在 doBind0() 方法中,绑定完端口后会调用另一个 fireChannelActive() 方法传递 active 入站事件,它们代码差别不大

doBind0()#

这个方法只做了一件事:对 Channel 进行绑定端口

通过 eventLoop 线程异步执行 bind() 方法,我们一路跟踪它

private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
@Override
default ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline().bind(localAddress, promise);
}
@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
// 通过 TailContext 调用 bind 方法
return tail.bind(localAddress, promise);
}
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
ObjectUtil.checkNotNull(localAddress, "localAddress");
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}
// MASK 用来标识某个 ChannelHandler 是否实现了某类,找到实现了 bind 方法的 HandlerContext
final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
// 递归执行 HandlerContext 的 bind 方法(事件传递)
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null, false);
}
return promise;
}

我们发现 bind() 方法是先通过 TailContext 调用,然后再通过 findContextOutbound(MASK_BIND) 方法获取下一个实现了 bind() 方法的 Handler 的 HandlerContext,调用 invokeBind() 方法传递 bind 出站事件

观察 findContextOutbound() 方法,它的函数名是 out 代表出站方向,从 TailContext 往前找 HandlerContext

Extend

也存在 findContextInbound() 方法,in 代表入站方向,从 HeadContext 往后找 HandlerContext

在之前的 pipeline.fireChannelRegistered() 方法,传递 register 入站事件中有使用到

private AbstractChannelHandlerContext findContextOutbound(int mask) {
AbstractChannelHandlerContext ctx = this;
EventExecutor currentExecutor = executor();
do {
// 往前找
ctx = ctx.prev;
// 通过 mask 掩码和 MASK_ONLY_OUTBOUND 集合掩码筛选符合的 HandlerContext
} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
return ctx;
}

这里的 mask 是使用二进制标记出入站的事件类型掩码,通过位运算筛选事件

ChannelHandlerMask#

final class ChannelHandlerMask {
// Using to mask which methods must be called for a ChannelHandler.
static final int MASK_EXCEPTION_CAUGHT = 1;
static final int MASK_CHANNEL_REGISTERED = 1 << 1;
static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
static final int MASK_CHANNEL_ACTIVE = 1 << 3;
static final int MASK_CHANNEL_INACTIVE = 1 << 4;
static final int MASK_CHANNEL_READ = 1 << 5;
static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;
static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;
static final int MASK_BIND = 1 << 9;
static final int MASK_CONNECT = 1 << 10;
static final int MASK_DISCONNECT = 1 << 11;
static final int MASK_CLOSE = 1 << 12;
static final int MASK_DEREGISTER = 1 << 13;
static final int MASK_READ = 1 << 14;
static final int MASK_WRITE = 1 << 15;
static final int MASK_FLUSH = 1 << 16;
// 事件集合
static final int MASK_ONLY_INBOUND = MASK_CHANNEL_REGISTERED |
MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |
MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;
private static final int MASK_ALL_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_INBOUND;
static final int MASK_ONLY_OUTBOUND = MASK_BIND | MASK_CONNECT | MASK_DISCONNECT |
MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;
private static final int MASK_ALL_OUTBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_OUTBOUND;
}
private static boolean skipContext(
AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
// Ensure we correctly handle MASK_EXCEPTION_CAUGHT which is not included in the MASK_EXCEPTION_CAUGHT
// 通过位运算,筛选实现了某个事件方法的 HandlerContext
return (ctx.executionMask & (onlyMask | mask)) == 0 ||
// We can only skip if the EventExecutor is the same as otherwise we need to ensure we offload
// everything to preserve ordering.
//
// See https://github.com/netty/netty/issues/10067
(ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
}

使用位运算效率高,一个 int 整数可以表示 32 个事件,原理不难,我们可以学习一下这种优化方式

MASK_A = 1 = 0000 0001
MASK_B = 1 << 1 = 0000 0010
MASK_C = 1 << 2 = 0000 0100
| 运算可以合并事件
表示 A 事件和 B 事件的集合
MASK_AB = MASK_A | MASK_B = 0000 0011
& 运算可以过滤事件
表示 A 事件在 AB 事件集合中
MASK_A & MASK_AB = 0000 0001
表示 C 事件不在 AB 事件集合中
MASK_C & MASK_AB = 0000 0000

invokeBind()#

找到实现了 bind() 方法的 Handler 的 HandlerContext 后,调用 invokeBind() 方法,我们看一下它的实现细节,很经典的责任链模式,调用 Handler 的 bind() 方法,实际上会通过 HandlerContext 调用 bind() 方法进行递归

private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
// invokeHandler() 防止 handler 还没添加完成,就开始接收事件
if (invokeHandler()) {
try {
// DON'T CHANGE
// Duplex handlers implements both out/in interfaces causing a scalability issue
// see https://bugs.openjdk.org/browse/JDK-8180450
final ChannelHandler handler = handler();
// 获取第一个 HandlerContext
final DefaultChannelPipeline.HeadContext headContext = pipeline.head;
if (handler == headContext) {
// 因为第一个 HandlerContext 是 netty 定义的,由它执行 unsafe.bind 比较安全
// 第一个 HandlerContext 执行 bind 对端口进行绑定
headContext.bind(this, localAddress, promise);
} else if (handler instanceof ChannelDuplexHandler) {
// 递归调用 bind 方法,继续找前一个 HandlerContext
((ChannelDuplexHandler) handler).bind(this, localAddress, promise);
} else if (handler instanceof ChannelOutboundHandlerAdapter) {
((ChannelOutboundHandlerAdapter) handler).bind(this, localAddress, promise);
} else {
((ChannelOutboundHandler) handler).bind(this, localAddress, promise);
}
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
bind(localAddress, promise);
}
}

递归出口是找到第一个 HeadContext,因为第一个 HandlerContext 是 Netty 定义的,由它执行 unsafe.bind() 对端口执行绑定比较安全(unsafe 一般不暴露给用户)

可能读者会纳闷,为什么不从一开始就通过第一个 HeadContext 执行 bind() 方法?

  • 因为要传递 bind 出站事件,通知所有的 Handler,例如 LoggingHandler,记录事件日志

绑定的逻辑不多了,我们大概过一下

@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
unsafe.bind(localAddress, promise);
}
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
// ...
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
// 递归执行 fireChannelActive 方法,激活 Channel 事件
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().bind(localAddress, config.getBacklog());
}
@Override
public ServerSocketChannel bind(SocketAddress local, int backlog) throws IOException {
synchronized (stateLock) {
ensureOpen();
if (localAddress != null)
throw new AlreadyBoundException();
// 根据操作系统,选择调用
if (isUnixSocket()) {
localAddress = unixBind(local, backlog);
} else {
localAddress = netBind(local, backlog);
}
}
return this;
}

最后根据操作系统,选择调用 unixBind() 或者 netBind(),后面的逻辑就不看了,执行完绑定后,会调用 pipeline.fireChannelActive() 方法,传递 active 入站事件

readIfIsAutoRead()#

在这个 active 事件中,先通过第一个 HeadContext 调用 channelActive() 方法,然后在方法中递归调用 fireChannelActive() 方法传递 active 事件,最后调用 readIfIsAutoRead() 开启 Channel 的 read 事件

@Override
public final ChannelPipeline fireChannelActive() {
if (head.executor().inEventLoop()) {
if (head.invokeHandler()) {
head.channelActive(head);
} else {
head.fireChannelActive();
}
} else {
head.executor().execute(this::fireChannelActive);
}
return this;
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
// 递归调用 fireChannelActive()
ctx.fireChannelActive();
// Channel 上开启读事件(OP_ACCEPT)的监听
readIfIsAutoRead();
}

fireChannelActive() 的事件传递就不展开说了,重点关注 readIfIsAutoRead() 方法,它开启了一个 read 出站事件,又开始一轮事件传递的递归调用,事件传递的代码都差不多,可以省略

  • readIfIsAutoRead()
    • channel.read()
      • pipeline.read()
        • tail.read() # 从尾到头传递 read 事件
          • unsafe.beginRead() # 直到 HeadContext 的 read() 方法执行它
            • AbstractNioChannel.doBeginRead()
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
IoRegistration registration = this.registration;
if (registration == null || !registration.isValid()) {
return;
}
// 设置 readPending
readPending = true;
// 注册 OP_ACCEPT 监听事件
addAndSubmit(readOps);
}
protected void addAndSubmit(NioIoOps addOps) {
// 获取当前 selector 注册的事件
int interestOps = selectionKey().interestOps();
// 如果没有当前的事件,注册该事件
if (!addOps.isIncludedIn(interestOps)) {
try {
registration().submit(NioIoOps.valueOf(interestOps).with(addOps));
} catch (Exception e) {
throw new ChannelException(e);
}
}
}

最后通过 IoRegistration 向 selector 注册 OP_ACCEPT 监听事件

到此,ServerBootstrap 的启动流程完成

总结#

  • Handler 初始化
    • 通过 ChannelInitializer 添加用户设置的 Handler,执行完后把它从 ChannelPipeline 中移除
    • 当 server 接收到客户端的连接请求,通过 ServerBootstrapAcceptor,向 ChannelPipeline 添加用户设置的 ChildHandler
  • Netty 事件传递机制(责任链模式 + 二进制掩码过滤)
    • 入站:从 HeadContext 递归遍历到 TailContext,调用对应事件方法
    • 出站:从 TailContext 递归遍历到 HeadContext,调用对应事件方法
  • 事件通知
    • register 入站事件在 ChannelInitializer 执行完初始化后通知
    • bind 出站事件在 bind 方法执行过程中通知
    • active 入站事件在 bind 方法执行完成后通知
    • read 出站事件在 active 事件通知完后,再开启通知
  • 核心组件的关联关系,一图流

图1

Netty4.2 - Server 启动源码跟踪
https://cloop.zone.id/posts/technology/netty-server-bootstrap/
作者
Cloop
发布于
2025-07-15
许可协议
CC BY-NC-SA 4.0