4592 字
23 分钟
Netty4.2 - Server 建立连接源码跟踪

前言#

在写这篇文章的时候,阅读源码遇到不懂的地方,经常会使用 AI 辅助理解,好像搜索资料,寻找优秀博文的频率变得越来越少了,不禁让人思考现在 AI 已经这么便利,还需要自己阅读源码吗?直接交给 AI 分析是不是更快,似乎连写这篇文章都没有必要了,想到这里,我陷入了沉思

和我有一样想法的人应该有很多,不止是阅读源码,平时遇到很多问题都可以使用 AI 帮助我们解决,能够使用如此有力的工具,何乐不为?

唯物辩证法告诉我们,事物普遍存在矛盾性,即对立统一。如果长期强依赖于使用 AI 解决问题,可能会慢慢失去一些独立思考、拆解问题等基础能力,在大多数人逐渐依赖使用 AI 解决问题的趋势下,如果我们能利用 AI 提升自己的基础能力,反过来更高效地运用 AI,这是一种螺旋式上升的过程,抓住机遇,也许就能在 AI 时代的浪潮下逆流而上

无论写博客还是阅读源码,我对此并不感到悲观,只要能在利用 AI 辅助理解的过程中提升我们的基础能力,倒也不必纠结所谓的意义,关键在于如何向 AI 提问问题

在此抛砖引玉

I/O 多路复用#

server 接收到连接请求会先进入 SingleThreadEventExecutor 的 doStartThread() 方法在一个死循环中开启 EventLoop 线程的主循环,然后跳出这个死循环,所以实际只执行了一次

这样做可以根据状态进行重试或者跳出循环,并且预留了扩展

private void doStartThread() {
executor.execute(new Runnable() {
@Override
public void run() {
// 获取锁
processingLock.lock();
// ...
try {
for (;;) {
// 开启 bossGroup 线程 EventLoop 的主循环
SingleThreadEventExecutor.this.run();
success = true;
int currentState = state;
if (canSuspend(currentState)) {
if (!STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this,
ST_SUSPENDING, ST_SUSPENDED)) {
// Try again as the CAS failed.
continue;
}
if (!canSuspend(ST_SUSPENDED) && STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this,
ST_SUSPENDED, ST_STARTED)) {
// Seems like there was something added to the task queue again in the meantime but we
// were able to re-engage this thread as the event loop thread.
continue;
}
suspend = true;
}
// 跳出循环
break;
}
} catch (Throwable t) {
unexpectedException = t;
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
// ...
}
}
});
}

这个线程就是上一篇文章中,bossGroup 选择的 EventLoop 线程,绑定端口并且监听 OP_ACCEPT 事件

线程中通过 do… while() 不断循环干两件事

  • runIo() 监听和处理 I/O 事件
  • runAllTasks() 执行 EventLoop 线程任务

这就是 IO 多路复用,只不过在 Netty 中多了 EventLoop 线程任务

@Override
protected void run() {
// 判断是否在 EventLoop 线程中
assert inEventLoop();
// 执行 ioHandler 的初始化(生命周期钩子函数)
ioHandler.initialize();
do {
// 处理 I/O 就绪事件
runIo();
if (isShuttingDown()) {
// ioHandler 销毁前执行(生命周期钩子函数)
ioHandler.prepareToDestroy();
}
// Now run all tasks for the maximum configured amount of time before trying to run IO again.
// 在规定时间内执行所有 EventLoop 的异步任务
runAllTasks(maxTaskProcessingQuantumNs);
// We should continue with our loop until we either confirmed a shutdown or we can suspend it.
// 如源码注释所说,这里应该继续循环,直到我们认为可以停止
} while (!confirmShutdown() && !canSuspend());
}
protected int runIo() {
assert inEventLoop();
// 通过 ioHandler 执行
return ioHandler.run(context);
}
@Override
public int run(IoHandlerContext context) {
int handled = 0;
try {
try {
// 有任务调用一次 selectNow,及时处理 I/O 事件
// 无任务,进入 SELECT 阻塞
switch (selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock())) {
case SelectStrategy.CONTINUE:
return 0;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
// 通过 selector.select() 阻塞 I/O 监听事件
select(context, wakenUp.getAndSet(false));
// wakeUp 变量表示是否唤醒 selector
// 每次新一轮循环,wakeUp 设置为 false
// 当有任务执行时,会设置为 true 唤醒线程
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
handleLoopException(e);
return 0;
}
cancelledKeys = 0;
needsToSelectAgain = false;
// 处理 SelectedKey
handled = processSelectedKeys();
} catch (Error e) {
throw e;
} catch (Throwable t) {
handleLoopException(t);
}
return handled;
}

ioHandler.run() 方法中通过策略模式判断是否有任务要执行

  • 有,调用一次 selectNow()
  • 无,进入 SELECT 阻塞

因为 selectNow() 立即返回就绪事件,不会阻塞线程,所以在执行 runAllTasks() 之前可以及时处理 I/O 事件

我们继续跟踪 select(context, wakenUp.getAndSet(false)) 方法,看看 Netty 是如何监听 I/O 事件的

避免 JDK 空轮询 Bug#

在这个方法中,主要通过 for 循环不断调用 selector.select(timeoutMillis) 方法阻塞线程,直到有任务要执行,跳出循环,执行后面的 runAllTasks() 方法

timeoutMillis 是根据下一个任务执行时间动态计算的

因有任务要执行而跳出循环的情况有以下两种:

  • 执行任务把 wakenUp 设置为 true
  • timeoutMillis <= 0,达到执行任务时间

其中涉及到如何处理 JDK 的空轮询 Bug

JDK 空轮询 Bug

Java NIO 的 Selector.select() 在某些异常情况下,没有任何 I/O 事件,但持续频繁地返回事件数量 0,导致 CPU 占用率飙升至 100%

Netty 并没有实际解决 JDK 的空轮询 Bug,而是通过空轮询计数 + 重建 Selector,避免了这个 Bug

  • selector.select(timeoutMillis) 设置阻塞超时时间
  • 计数 + 1,判断是否阻塞超时,如果正常超时重置计数为 1
  • 如果计数超出 512 次,认为触发了空轮询 Bug
  • 重新创建 Selector
  • 通过 IoRegistration 把旧的 Channels 注册到新的 Selector 上
private void select(IoHandlerContext runner, boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
// 根据下一个任务执行时间判断 selector.select() 可以阻塞多久
// 截止时间 = 当前时间 + 队列第一个任务剩余执行时间
long selectDeadLineNanos = currentTimeNanos + runner.delayNanos(currentTimeNanos);
for (;;) {
// 计算超时,防止一直阻塞,每次循环最后会更新 currentTimeNanos
// 阻塞时间 = 截止时间 - 当前时间 + 0.5毫秒(四舍五入)/ 1000000L(转毫秒)
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
// 小于等于0,说明任务该执行了,跳出循环
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
// Selector#wakeup. So we need to check task queue again before executing select operation.
// If we don't, the task might be pended until select operation was timed out.
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
if (!runner.canBlock() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
// 规定时间内执行 select
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
// 如果有事件、任务唤醒等操作,跳出循环
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || !runner.canBlock()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
if (Thread.interrupted()) {
// Thread was interrupted so reset selected keys and break so we not run into a busy loop.
// As this is most likely a bug in the handler of the user or it's client library we will
// also log it.
//
// See https://github.com/netty/netty/issues/2426
if (logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely because " +
"Thread.currentThread().interrupt() was called. Use " +
"NioHandler.shutdownGracefully() to shutdown the NioHandler.");
}
selectCnt = 1;
break;
}
long time = System.nanoTime();
// 当前时间 - 阻塞时间 >= 循环开始时间
// 等同于 阻塞时间 >= select 执行时间(当前时间 - 循环开始时间)
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
// 正常超时 selectCnt 重置为 1
selectCnt = 1;
// 超出阈值,重建 selector
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The code exists in an extra method to ensure the method is not too big to inline as this
// branch is not very likely to get hit very frequently.
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
// 更新当前时间
currentTimeNanos = time;
}
//...
} catch (CancelledKeyException e) {
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
// Harmless exception - log anyway
}
}

重建 Selector 的逻辑我们大概跟一下,先创建 JDK 的 Selector,使用反射篡改 JDK 原生 Selector 对象内部的私有字段为数组,最后通过 IoRegistration 把旧的 channels 注册到新的 selector 中

void rebuildSelector0() {
final Selector oldSelector = selector;
final SelectorTuple newSelectorTuple;
if (oldSelector == null) {
return;
}
try {
// 创建新的 selector
newSelectorTuple = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.
// 通过 IoRegistration 把旧的 channels 注册到新的 selector
int nChannels = 0;
for (SelectionKey key : oldSelector.keys()) {
DefaultNioRegistration handle = (DefaultNioRegistration) key.attachment();
try {
if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
continue;
}
handle.register(newSelectorTuple.unwrappedSelector);
nChannels++;
} catch (Exception e) {
logger.warn("Failed to re-register a NioHandle to the new Selector.", e);
handle.cancel();
}
}
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;
try {
// time to close the old selector as everything else is registered to the new one
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}
//...
}
private SelectorTuple openSelector() {
final Selector unwrappedSelector;
try {
// 创建 JDK NIO 的 selector
unwrappedSelector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
if (DISABLE_KEY_SET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
return Class.forName(
"sun.nio.ch.SelectorImpl",
false,
PlatformDependent.getSystemClassLoader());
} catch (Throwable cause) {
return cause;
}
}
});
//...
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass;
final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
// 反射替换 selectedKeySet 的数据结构,把 HashSet 替换成数组
Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() {
@Override
public Object run() {
try {
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) {
// Let us try to use sun.misc.Unsafe to replace the SelectionKeySet.
// This allows us to also do this in Java9+ without any extra flags.
long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField);
long publicSelectedKeysFieldOffset =
PlatformDependent.objectFieldOffset(publicSelectedKeysField);
if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) {
PlatformDependent.putObject(
unwrappedSelector, selectedKeysFieldOffset, selectedKeySet);
PlatformDependent.putObject(
unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet);
return null;
}
// We could not retrieve the offset, lets try reflection as last-resort.
}
//...
selectedKeysField.set(unwrappedSelector, selectedKeySet);
publicSelectedKeysField.set(unwrappedSelector, selectedKeySet);
return null;
} catch (NoSuchFieldException | IllegalAccessException e) {
return e;
}
}
});
//...
selectedKeys = selectedKeySet;
}

processSelectedKeys()#

监听到新的连接后(OP_ACCEPT),就会跳出 select 阻塞的循环,调用 processSelectedKeys() 方法处理 selectedKeys

遍历 selectedKeys 的方式有两种

  • 优化过,通过创建 Selector 时反射替换的数组遍历
  • 没优化过,直接使用原来的 HashSet 迭代器遍历

因为使用迭代器每次遍历都会创建一个新的对象,可能会造成频繁 GC,一般来说,Netty 默认都是使用优化过的处理方式遍历 key

这种优化方式不太常规,如果后续 JDK 有改动,反射可能会失效,但目前来说收益远大于风险,是一种 破坏封装换取极致性能 的权衡

public int run(IoHandlerContext context) {
//...
select(context, wakenUp.getAndSet(false));
// 处理 SelectedKey
handled = processSelectedKeys();
//...
return handled;
}
private int processSelectedKeys() {
// selector 创建的时候通过反射复制 selectedKeys
if (selectedKeys != null) {
// 优化处理,使用数组
return processSelectedKeysOptimized();
} else {
// 没有优化的处理,使用迭代器
return processSelectedKeysPlain(selector.selectedKeys());
}
}
private int processSelectedKeysOptimized() {
int handled = 0;
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
// 有些 Channel 已经关闭,但 key 还存在,影响 GC
selectedKeys.keys[i] = null;
// 处理 key
processSelectedKey(k);
++handled;
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
// 清除后面的 selectedKey,防止 Channel 关闭 key 没有遍历到
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
return handled;
}
private void processSelectedKey(SelectionKey k) {
// 从 SelectionKey 获取 IoRegistration
final DefaultNioRegistration registration = (DefaultNioRegistration) k.attachment();
if (!registration.isValid()) {
try {
registration.handle.close();
} catch (Exception e) {
logger.debug("Exception during closing " + registration.handle, e);
}
return;
}
// 从 SelectionKey 中获取操作数,通过 IoRegistration 的 handle 判断事件掩码
// 选择对应的 Channel 执行
registration.handle(k.readyOps());
}

最后把 key 的操作数交给 IoRegistration 的 IoHandle 选择对应的 Channel 处理事件,利用二进制掩码筛选 OP_ACCPET 事件是否在集合中,然后执行 read() 方法

NioUnsafe.read()#

这里的 read() 方法有两种实现

  • 建立连接监听 OP_ACCPET 事件是 AbstractNioMessageChannel 里的 NioByteUnsafe
  • 后续监听 OP_READ 事件是 AbstractNioByteChannel 里的 NioMessageUnsafe

在上一篇文章,io.netty.example.echo.EchoServer 的案例里 ServerBootstrap 配置的 Channel 是 NioServerSocketChannel.class

ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
//...

我们看一下它的继承关系

图1

NioServerSocketChannel 继承了 AbstractNioMessageChannel,我们回顾一下,在后续的注册逻辑中 DefaultNioRegistration 会把 AbstractNioMessageChannel 的 unsafe 实例关联到 handle,最后把 OP_ACCPET 事件注册到 Selector

所以这里监听到 OP_ACCPET 事件,执行的 read() 方法是 AbstractNioMessageChannel 的 NioByteUnsafe 实现

@Override
public void handle(IoRegistration registration, IoEvent event) {
try {
NioIoEvent nioEvent = (NioIoEvent) event;
// 获取当前事件的掩码
NioIoOps nioReadyOps = nioEvent.ops();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if (nioReadyOps.contains(NioIoOps.CONNECT)) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
removeAndSubmit(NioIoOps.CONNECT);
unsafe().finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if (nioReadyOps.contains(NioIoOps.WRITE)) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to
// write
forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
// 通过位运算,判断当前事件掩码是否在 read 和 accept 事件集合中
if (nioReadyOps.contains(NioIoOps.READ_AND_ACCEPT) || nioReadyOps.equals(NioIoOps.NONE)) {
// 有两个实现,建立连接监听 OP_ACCPET 事件是 AbstractNioMessageChannel
// 后续监听 OP_READ 事件是 AbstractNioByteChannel
read();
}
} catch (CancelledKeyException ignored) {
close(voidPromise());
}
}

在 AbstractNioMessageChannel 的 read() 方法中,会先通过 RecvByteBufAllocator 分配缓冲区大小,然后执行 doReadMessages() 方法

在 doReadMessages() 方法的实现里会把新连接当作消息,先创建一个 JDK 原生的 Socket 连接封装到 NioSocketChannel 并且指定 OP_READ 操作数,然后放到 readBuf 中

最后遍历 readBuf 并调用 fireChannelRead() 方法传递 read 事件

RecvByteBufAllocator

内存大小预测器,可以计算预测下次申请 byteBuf 的容量大小,达到提升性能和节省内存的目的

@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
// 获取内存大小预测器,分配缓冲区
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
// 重置内存大小预测器的属性
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
// 服务端这里会创建 channel 放到 readBuf 中
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
// 增加已读信息数
allocHandle.incMessagesRead(localRead);
} while (continueReading(allocHandle));
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
// 是否等待下一次读事件
readPending = false;
// 传递 read 事件
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
// 读结束,根据这次读取的字节数,动态调整下次的预测
allocHandle.readComplete();
// 传递 readComplete 事件
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
// 使用 JDK 创建一个 socket 连接
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
// 将 socket 连接包装为 channel
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
// 指定 OP_READ 操作数
super(parent, ch, SelectionKey.OP_READ);
}

最终会在 channelRead() 方法中通过轮询的负载均衡算法选择一个 ChildGroup 的 Worker 线程注册 Channel 并添加一个关闭的监听器

@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
// 添加 childHandler
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
//...
try {
// 选择一个 worker 线程注册 channel,并且添加一个关闭的监听器
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
@Override
public EventExecutor next() {
// 通过轮询的负载均衡算法选择一个 worker 线程
return executors[idx.getAndIncrement() & executors.length - 1];
}

这里的 register 注册逻辑和 上一篇文章 是一样的

PS:但要提醒一下,注册流程会创建新的 IoRegistration,关联 NioSocketChannel 的 unsafe 实例,把自己绑定到 key 的 attachment 中

都会进入到 register0() 方法,主要区别是

  • 初始化 ChildHandler 的 ChannelInitializer
  • isActive() 为 true

会直接调用 fireChannelActive() 方法传递 active 事件

private void register0(ChannelPromise promise) {
//...
// 初始化 ChildHandler 的 ChannelInitializer
pipeline.invokeHandlerAddedIfNeeded();
// 设置状态,通知 promise 的 Listener
safeSetSuccess(promise);
// 递归调用 ChannelHandlerContext 所有的 channelRegistered 方法(事件传递)
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
//...
// selector 注册到 IoHandle 然后通过 IoRegistration 注册 IoHandle 到 IoHandler
doRegister(registerPromise);
}

上一篇文章 中有提到 fireChannelActive() 这个方法里会调用 readIfIsAutoRead() 方法开启一个 read 出站事件

最终进入到 doBeginRead() 方法,向 Selector 注册 OP_READ 操作数

protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
IoRegistration registration = this.registration;
if (registration == null || !registration.isValid()) {
return;
}
readPending = true;
addAndSubmit(readOps);
}
protected void addAndSubmit(NioIoOps addOps) {
int interestOps = selectionKey().interestOps();
if (!addOps.isIncludedIn(interestOps)) {
try {
// 向 selector 注册 OP_READ 操作数
registration().submit(NioIoOps.valueOf(interestOps).with(addOps));
} catch (Exception e) {
throw new ChannelException(e);
}
}
}

最后调用 fireChannelReadComplete() 方法传递 readComplete 事件,到此 server 建立连接的流程走完,开始下一轮循环继续监听新连接

后续 OP_READ 事件的监听,都是在 ChildGroup 的 Worker 线程中进行的

可以看出 Netty 的主从 Reactor 线程模型

  • 主 Reactor (Boss 线程)负责监听 OP_ACCPET 事件
  • 从 Reactor (Worker 线程)负责注册和处理 OP_READ / OP_WRITE 事件

Reactor 线程模型

一个线程循环调用 Selectorselect() 方法(IO 多路复用)进行监听,作为一个 Reactor

本质是 I/O 多路复用 + 线程池

当监听到 OP_READ 事件,执行流程都是差不多的,只是从 key 中获取的 IoRegistration 不同,通过 IoHandle 执行的 Channel 不同,走的是 AbstractNioByteChannel 的 read() 方法读取消息

总结#

优化:

  • I/O 多路复用,循环调用 selector.select(timeoutMillis) 阻塞线程,监听 I/O 事件,如果有 Eventloop 任务需要执行,就唤醒线程执行任务
  • 通过空轮询计数 + 重建 Selector 的方式避免 JDK 空轮询 Bug
  • 监听到 I/O 事件,使用创建 Selector 时反射替换 selectedKeys 的数据结构优化遍历事件集合,提高效率
  • 通过二进制掩码筛选事件类型
  • 使用 RecvByteBufAllocator 计算预测下次申请 byteBuf 的容量大小提高效率

流程:

  • Netty 是通过 BossGroup 的 EventLoop 线程不断循环调用 select(timeoutMillis) 阻塞线程监听 I/O 事件,如果有 EventLoop 任务需要执行,就唤醒线程执行任务
  • 当有新连接建立时,server 端会监听到 OP_ACCPET 事件的 key,然后通过 key 获取到对应的 IoRegistration 通过 IoHandle 执行 AbstractNioMessageChannel 的 read() 方法处理消息
  • 通过把新连接当作消息,创建一个 JDK 原生的 Socket 连接封装到 NioSocketChannel 指定 OP_READ 操作数,交给 ChannelPipeline 当作消息传递 read 入站事件
  • 通过轮询的负载均衡方式选择 ChildGroup 的一个 Worker 线程执行注册 NioSocketChannel
  • 在 Worker 线程中传递 read 出站事件,完成注册 OP_READ 事件的逻辑
Netty4.2 - Server 建立连接源码跟踪
https://cloop.zone.id/posts/technology/netty-server-connect/
作者
Cloop
发布于
2025-08-15
许可协议
CC BY-NC-SA 4.0