前言
在写这篇文章的时候,阅读源码遇到不懂的地方,经常会使用 AI 辅助理解,好像搜索资料,寻找优秀博文的频率变得越来越少了,不禁让人思考现在 AI 已经这么便利,还需要自己阅读源码吗?直接交给 AI 分析是不是更快,似乎连写这篇文章都没有必要了,想到这里,我陷入了沉思
和我有一样想法的人应该有很多,不止是阅读源码,平时遇到很多问题都可以使用 AI 帮助我们解决,能够使用如此有力的工具,何乐不为?
唯物辩证法告诉我们,事物普遍存在矛盾性,即对立统一。如果长期强依赖于使用 AI 解决问题,可能会慢慢失去一些独立思考、拆解问题等基础能力,在大多数人逐渐依赖使用 AI 解决问题的趋势下,如果我们能利用 AI 提升自己的基础能力,反过来更高效地运用 AI,这是一种螺旋式上升的过程,抓住机遇,也许就能在 AI 时代的浪潮下逆流而上
无论写博客还是阅读源码,我对此并不感到悲观,只要能在利用 AI 辅助理解的过程中提升我们的基础能力,倒也不必纠结所谓的意义,关键在于如何向 AI 提问问题
在此抛砖引玉
I/O 多路复用
server 接收到连接请求会先进入 SingleThreadEventExecutor 的 doStartThread() 方法在一个死循环中开启 EventLoop 线程的主循环,然后跳出这个死循环,所以实际只执行了一次
这样做可以根据状态进行重试或者跳出循环,并且预留了扩展
private void doStartThread() { executor.execute(new Runnable() { @Override public void run() { // 获取锁 processingLock.lock(); // ... try { for (;;) { // 开启 bossGroup 线程 EventLoop 的主循环 SingleThreadEventExecutor.this.run(); success = true;
int currentState = state; if (canSuspend(currentState)) { if (!STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, ST_SUSPENDING, ST_SUSPENDED)) { // Try again as the CAS failed. continue; }
if (!canSuspend(ST_SUSPENDED) && STATE_UPDATER.compareAndSet(SingleThreadEventExecutor.this, ST_SUSPENDED, ST_STARTED)) { // Seems like there was something added to the task queue again in the meantime but we // were able to re-engage this thread as the event loop thread. continue; } suspend = true; }
// 跳出循环 break; } } catch (Throwable t) { unexpectedException = t; logger.warn("Unexpected exception from an event executor: ", t); } finally { // ... } } });}这个线程就是上一篇文章中,bossGroup 选择的 EventLoop 线程,绑定端口并且监听 OP_ACCEPT 事件
线程中通过 do… while() 不断循环干两件事
- runIo() 监听和处理 I/O 事件
- runAllTasks() 执行 EventLoop 线程任务
这就是 IO 多路复用,只不过在 Netty 中多了 EventLoop 线程任务
@Overrideprotected void run() { // 判断是否在 EventLoop 线程中 assert inEventLoop(); // 执行 ioHandler 的初始化(生命周期钩子函数) ioHandler.initialize(); do { // 处理 I/O 就绪事件 runIo(); if (isShuttingDown()) { // ioHandler 销毁前执行(生命周期钩子函数) ioHandler.prepareToDestroy(); } // Now run all tasks for the maximum configured amount of time before trying to run IO again. // 在规定时间内执行所有 EventLoop 的异步任务 runAllTasks(maxTaskProcessingQuantumNs);
// We should continue with our loop until we either confirmed a shutdown or we can suspend it. // 如源码注释所说,这里应该继续循环,直到我们认为可以停止 } while (!confirmShutdown() && !canSuspend());}
protected int runIo() { assert inEventLoop(); // 通过 ioHandler 执行 return ioHandler.run(context);}
@Overridepublic int run(IoHandlerContext context) { int handled = 0; try { try { // 有任务调用一次 selectNow,及时处理 I/O 事件 // 无任务,进入 SELECT 阻塞 switch (selectStrategy.calculateStrategy(selectNowSupplier, !context.canBlock())) { case SelectStrategy.CONTINUE: return 0;
case SelectStrategy.BUSY_WAIT: // fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT: // 通过 selector.select() 阻塞 I/O 监听事件 select(context, wakenUp.getAndSet(false));
// wakeUp 变量表示是否唤醒 selector // 每次新一轮循环,wakeUp 设置为 false // 当有任务执行时,会设置为 true 唤醒线程 if (wakenUp.get()) { selector.wakeup(); } // fall through default: } } catch (IOException e) { // If we receive an IOException here its because the Selector is messed up. Let's rebuild // the selector and retry. https://github.com/netty/netty/issues/8566 rebuildSelector0(); handleLoopException(e); return 0; }
cancelledKeys = 0; needsToSelectAgain = false; // 处理 SelectedKey handled = processSelectedKeys(); } catch (Error e) { throw e; } catch (Throwable t) { handleLoopException(t); } return handled;}ioHandler.run() 方法中通过策略模式判断是否有任务要执行
- 有,调用一次 selectNow()
- 无,进入 SELECT 阻塞
因为 selectNow() 立即返回就绪事件,不会阻塞线程,所以在执行 runAllTasks() 之前可以及时处理 I/O 事件
我们继续跟踪 select(context, wakenUp.getAndSet(false)) 方法,看看 Netty 是如何监听 I/O 事件的
避免 JDK 空轮询 Bug
在这个方法中,主要通过 for 循环不断调用 selector.select(timeoutMillis) 方法阻塞线程,直到有任务要执行,跳出循环,执行后面的 runAllTasks() 方法
timeoutMillis 是根据下一个任务执行时间动态计算的
因有任务要执行而跳出循环的情况有以下两种:
- 执行任务把 wakenUp 设置为 true
- timeoutMillis <= 0,达到执行任务时间
其中涉及到如何处理 JDK 的空轮询 Bug
JDK 空轮询 Bug
Java NIO 的
Selector.select()在某些异常情况下,没有任何 I/O 事件,但持续频繁地返回事件数量 0,导致 CPU 占用率飙升至 100%
Netty 并没有实际解决 JDK 的空轮询 Bug,而是通过空轮询计数 + 重建 Selector,避免了这个 Bug
- selector.select(timeoutMillis) 设置阻塞超时时间
- 计数 + 1,判断是否阻塞超时,如果正常超时重置计数为 1
- 如果计数超出 512 次,认为触发了空轮询 Bug
- 重新创建 Selector
- 通过 IoRegistration 把旧的 Channels 注册到新的 Selector 上
private void select(IoHandlerContext runner, boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { int selectCnt = 0; long currentTimeNanos = System.nanoTime(); // 根据下一个任务执行时间判断 selector.select() 可以阻塞多久 // 截止时间 = 当前时间 + 队列第一个任务剩余执行时间 long selectDeadLineNanos = currentTimeNanos + runner.delayNanos(currentTimeNanos);
for (;;) { // 计算超时,防止一直阻塞,每次循环最后会更新 currentTimeNanos // 阻塞时间 = 截止时间 - 当前时间 + 0.5毫秒(四舍五入)/ 1000000L(转毫秒) long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; // 小于等于0,说明任务该执行了,跳出循环 if (timeoutMillis <= 0) { if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; }
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call // Selector#wakeup. So we need to check task queue again before executing select operation. // If we don't, the task might be pended until select operation was timed out. // It might be pended until idle timeout if IdleStateHandler existed in pipeline. if (!runner.canBlock() && wakenUp.compareAndSet(false, true)) { selector.selectNow(); selectCnt = 1; break; }
// 规定时间内执行 select int selectedKeys = selector.select(timeoutMillis); selectCnt ++;
// 如果有事件、任务唤醒等操作,跳出循环 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || !runner.canBlock()) { // - Selected something, // - waken up by user, or // - the task queue has a pending task. // - a scheduled task is ready for processing break; } if (Thread.interrupted()) { // Thread was interrupted so reset selected keys and break so we not run into a busy loop. // As this is most likely a bug in the handler of the user or it's client library we will // also log it. // // See https://github.com/netty/netty/issues/2426 if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely because " + "Thread.currentThread().interrupt() was called. Use " + "NioHandler.shutdownGracefully() to shutdown the NioHandler."); } selectCnt = 1; break; }
long time = System.nanoTime(); // 当前时间 - 阻塞时间 >= 循环开始时间 // 等同于 阻塞时间 >= select 执行时间(当前时间 - 循环开始时间) if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { // timeoutMillis elapsed without anything selected. // 正常超时 selectCnt 重置为 1 selectCnt = 1; // 超出阈值,重建 selector } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // The code exists in an extra method to ensure the method is not too big to inline as this // branch is not very likely to get hit very frequently. selector = selectRebuildSelector(selectCnt); selectCnt = 1; break; } // 更新当前时间 currentTimeNanos = time; } //... } catch (CancelledKeyException e) { if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e); } // Harmless exception - log anyway }}重建 Selector 的逻辑我们大概跟一下,先创建 JDK 的 Selector,使用反射篡改 JDK 原生 Selector 对象内部的私有字段为数组,最后通过 IoRegistration 把旧的 channels 注册到新的 selector 中
void rebuildSelector0() { final Selector oldSelector = selector; final SelectorTuple newSelectorTuple;
if (oldSelector == null) { return; }
try { // 创建新的 selector newSelectorTuple = openSelector(); } catch (Exception e) { logger.warn("Failed to create a new Selector.", e); return; }
// Register all channels to the new Selector. // 通过 IoRegistration 把旧的 channels 注册到新的 selector int nChannels = 0; for (SelectionKey key : oldSelector.keys()) { DefaultNioRegistration handle = (DefaultNioRegistration) key.attachment(); try { if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) { continue; }
handle.register(newSelectorTuple.unwrappedSelector); nChannels++; } catch (Exception e) { logger.warn("Failed to re-register a NioHandle to the new Selector.", e); handle.cancel(); } }
selector = newSelectorTuple.selector; unwrappedSelector = newSelectorTuple.unwrappedSelector;
try { // time to close the old selector as everything else is registered to the new one oldSelector.close(); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("Failed to close the old Selector.", t); } } //...}
private SelectorTuple openSelector() { final Selector unwrappedSelector; try { // 创建 JDK NIO 的 selector unwrappedSelector = provider.openSelector(); } catch (IOException e) { throw new ChannelException("failed to open a new selector", e); }
if (DISABLE_KEY_SET_OPTIMIZATION) { return new SelectorTuple(unwrappedSelector); }
Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { return Class.forName( "sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader()); } catch (Throwable cause) { return cause; } } });
//...
final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass; final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); // 反射替换 selectedKeySet 的数据结构,把 HashSet 替换成数组 Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
if (PlatformDependent.javaVersion() >= 9 && PlatformDependent.hasUnsafe()) { // Let us try to use sun.misc.Unsafe to replace the SelectionKeySet. // This allows us to also do this in Java9+ without any extra flags. long selectedKeysFieldOffset = PlatformDependent.objectFieldOffset(selectedKeysField); long publicSelectedKeysFieldOffset = PlatformDependent.objectFieldOffset(publicSelectedKeysField);
if (selectedKeysFieldOffset != -1 && publicSelectedKeysFieldOffset != -1) { PlatformDependent.putObject( unwrappedSelector, selectedKeysFieldOffset, selectedKeySet); PlatformDependent.putObject( unwrappedSelector, publicSelectedKeysFieldOffset, selectedKeySet); return null; } // We could not retrieve the offset, lets try reflection as last-resort. } //... selectedKeysField.set(unwrappedSelector, selectedKeySet); publicSelectedKeysField.set(unwrappedSelector, selectedKeySet); return null; } catch (NoSuchFieldException | IllegalAccessException e) { return e; } } }); //... selectedKeys = selectedKeySet;}processSelectedKeys()
监听到新的连接后(OP_ACCEPT),就会跳出 select 阻塞的循环,调用 processSelectedKeys() 方法处理 selectedKeys
遍历 selectedKeys 的方式有两种
- 优化过,通过创建 Selector 时反射替换的数组遍历
- 没优化过,直接使用原来的 HashSet 迭代器遍历
因为使用迭代器每次遍历都会创建一个新的对象,可能会造成频繁 GC,一般来说,Netty 默认都是使用优化过的处理方式遍历 key
这种优化方式不太常规,如果后续 JDK 有改动,反射可能会失效,但目前来说收益远大于风险,是一种 破坏封装换取极致性能 的权衡
public int run(IoHandlerContext context) { //... select(context, wakenUp.getAndSet(false)); // 处理 SelectedKey handled = processSelectedKeys(); //... return handled;}
private int processSelectedKeys() { // selector 创建的时候通过反射复制 selectedKeys if (selectedKeys != null) { // 优化处理,使用数组 return processSelectedKeysOptimized(); } else { // 没有优化的处理,使用迭代器 return processSelectedKeysPlain(selector.selectedKeys()); }}
private int processSelectedKeysOptimized() { int handled = 0; for (int i = 0; i < selectedKeys.size; ++i) { final SelectionKey k = selectedKeys.keys[i]; // null out entry in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 // 有些 Channel 已经关闭,但 key 还存在,影响 GC selectedKeys.keys[i] = null; // 处理 key processSelectedKey(k); ++handled;
if (needsToSelectAgain) { // null out entries in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 // 清除后面的 selectedKey,防止 Channel 关闭 key 没有遍历到 selectedKeys.reset(i + 1);
selectAgain(); i = -1; } } return handled;}
private void processSelectedKey(SelectionKey k) { // 从 SelectionKey 获取 IoRegistration final DefaultNioRegistration registration = (DefaultNioRegistration) k.attachment(); if (!registration.isValid()) { try { registration.handle.close(); } catch (Exception e) { logger.debug("Exception during closing " + registration.handle, e); } return; } // 从 SelectionKey 中获取操作数,通过 IoRegistration 的 handle 判断事件掩码 // 选择对应的 Channel 执行 registration.handle(k.readyOps());}最后把 key 的操作数交给 IoRegistration 的 IoHandle 选择对应的 Channel 处理事件,利用二进制掩码筛选 OP_ACCPET 事件是否在集合中,然后执行 read() 方法
NioUnsafe.read()
这里的 read() 方法有两种实现
- 建立连接监听 OP_ACCPET 事件是 AbstractNioMessageChannel 里的 NioByteUnsafe
- 后续监听 OP_READ 事件是 AbstractNioByteChannel 里的 NioMessageUnsafe
在上一篇文章,io.netty.example.echo.EchoServer 的案例里 ServerBootstrap 配置的 Channel 是 NioServerSocketChannel.class
ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class)//...我们看一下它的继承关系

NioServerSocketChannel 继承了 AbstractNioMessageChannel,我们回顾一下,在后续的注册逻辑中 DefaultNioRegistration 会把 AbstractNioMessageChannel 的 unsafe 实例关联到 handle,最后把 OP_ACCPET 事件注册到 Selector
所以这里监听到 OP_ACCPET 事件,执行的 read() 方法是 AbstractNioMessageChannel 的 NioByteUnsafe 实现
@Overridepublic void handle(IoRegistration registration, IoEvent event) { try { NioIoEvent nioEvent = (NioIoEvent) event; // 获取当前事件的掩码 NioIoOps nioReadyOps = nioEvent.ops(); // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise // the NIO JDK channel implementation may throw a NotYetConnectedException. if (nioReadyOps.contains(NioIoOps.CONNECT)) { // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking // See https://github.com/netty/netty/issues/924 removeAndSubmit(NioIoOps.CONNECT);
unsafe().finishConnect(); }
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory. if (nioReadyOps.contains(NioIoOps.WRITE)) { // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to // write forceFlush(); }
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead // to a spin loop // 通过位运算,判断当前事件掩码是否在 read 和 accept 事件集合中 if (nioReadyOps.contains(NioIoOps.READ_AND_ACCEPT) || nioReadyOps.equals(NioIoOps.NONE)) { // 有两个实现,建立连接监听 OP_ACCPET 事件是 AbstractNioMessageChannel // 后续监听 OP_READ 事件是 AbstractNioByteChannel read(); } } catch (CancelledKeyException ignored) { close(voidPromise()); }}在 AbstractNioMessageChannel 的 read() 方法中,会先通过 RecvByteBufAllocator 分配缓冲区大小,然后执行 doReadMessages() 方法
在 doReadMessages() 方法的实现里会把新连接当作消息,先创建一个 JDK 原生的 Socket 连接封装到 NioSocketChannel 并且指定 OP_READ 操作数,然后放到 readBuf 中
最后遍历 readBuf 并调用 fireChannelRead() 方法传递 read 事件
RecvByteBufAllocator
内存大小预测器,可以计算预测下次申请
byteBuf的容量大小,达到提升性能和节省内存的目的
@Overridepublic void read() { assert eventLoop().inEventLoop(); final ChannelConfig config = config(); final ChannelPipeline pipeline = pipeline(); // 获取内存大小预测器,分配缓冲区 final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); // 重置内存大小预测器的属性 allocHandle.reset(config);
boolean closed = false; Throwable exception = null; try { try { do { // 服务端这里会创建 channel 放到 readBuf 中 int localRead = doReadMessages(readBuf); if (localRead == 0) { break; } if (localRead < 0) { closed = true; break; } // 增加已读信息数 allocHandle.incMessagesRead(localRead); } while (continueReading(allocHandle)); } catch (Throwable t) { exception = t; }
int size = readBuf.size(); for (int i = 0; i < size; i ++) { // 是否等待下一次读事件 readPending = false; // 传递 read 事件 pipeline.fireChannelRead(readBuf.get(i)); } readBuf.clear(); // 读结束,根据这次读取的字节数,动态调整下次的预测 allocHandle.readComplete(); // 传递 readComplete 事件 pipeline.fireChannelReadComplete();
if (exception != null) { closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception); }
if (closed) { inputShutdown = true; if (isOpen()) { close(voidPromise()); } } } finally { if (!readPending && !config.isAutoRead()) { removeReadOp(); } }}
@Overrideprotected int doReadMessages(List<Object> buf) throws Exception { // 使用 JDK 创建一个 socket 连接 SocketChannel ch = SocketUtils.accept(javaChannel());
try { if (ch != null) { // 将 socket 连接包装为 channel buf.add(new NioSocketChannel(this, ch)); return 1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t);
try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } }
return 0;}
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) { // 指定 OP_READ 操作数 super(parent, ch, SelectionKey.OP_READ);}最终会在 channelRead() 方法中通过轮询的负载均衡算法选择一个 ChildGroup 的 Worker 线程注册 Channel 并添加一个关闭的监听器
@SuppressWarnings("unchecked")public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; // 添加 childHandler child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger); setAttributes(child, childAttrs);
//...
try { // 选择一个 worker 线程注册 channel,并且添加一个关闭的监听器 childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); }}
@Overridepublic ChannelFuture register(Channel channel) { return next().register(channel);}
@Overridepublic EventExecutor next() { // 通过轮询的负载均衡算法选择一个 worker 线程 return executors[idx.getAndIncrement() & executors.length - 1];}这里的 register 注册逻辑和 上一篇文章 是一样的
PS:但要提醒一下,注册流程会创建新的 IoRegistration,关联 NioSocketChannel 的 unsafe 实例,把自己绑定到 key 的 attachment 中
都会进入到 register0() 方法,主要区别是
- 初始化 ChildHandler 的 ChannelInitializer
- isActive() 为 true
会直接调用 fireChannelActive() 方法传递 active 事件
private void register0(ChannelPromise promise) { //... // 初始化 ChildHandler 的 ChannelInitializer pipeline.invokeHandlerAddedIfNeeded(); // 设置状态,通知 promise 的 Listener safeSetSuccess(promise); // 递归调用 ChannelHandlerContext 所有的 channelRegistered 方法(事件传递) pipeline.fireChannelRegistered();
if (isActive()) { if (firstRegistration) { pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } //...
// selector 注册到 IoHandle 然后通过 IoRegistration 注册 IoHandle 到 IoHandler doRegister(registerPromise);}在 上一篇文章 中有提到 fireChannelActive() 这个方法里会调用 readIfIsAutoRead() 方法开启一个 read 出站事件
最终进入到 doBeginRead() 方法,向 Selector 注册 OP_READ 操作数
protected void doBeginRead() throws Exception { // Channel.read() or ChannelHandlerContext.read() was called IoRegistration registration = this.registration; if (registration == null || !registration.isValid()) { return; }
readPending = true;
addAndSubmit(readOps);}
protected void addAndSubmit(NioIoOps addOps) { int interestOps = selectionKey().interestOps(); if (!addOps.isIncludedIn(interestOps)) { try { // 向 selector 注册 OP_READ 操作数 registration().submit(NioIoOps.valueOf(interestOps).with(addOps)); } catch (Exception e) { throw new ChannelException(e); } }}最后调用 fireChannelReadComplete() 方法传递 readComplete 事件,到此 server 建立连接的流程走完,开始下一轮循环继续监听新连接
后续 OP_READ 事件的监听,都是在 ChildGroup 的 Worker 线程中进行的
可以看出 Netty 的主从 Reactor 线程模型
- 主 Reactor (Boss 线程)负责监听 OP_ACCPET 事件
- 从 Reactor (Worker 线程)负责注册和处理 OP_READ / OP_WRITE 事件
Reactor 线程模型
一个线程循环调用
Selector的select()方法(IO 多路复用)进行监听,作为一个 Reactor本质是 I/O 多路复用 + 线程池
当监听到 OP_READ 事件,执行流程都是差不多的,只是从 key 中获取的 IoRegistration 不同,通过 IoHandle 执行的 Channel 不同,走的是 AbstractNioByteChannel 的 read() 方法读取消息
总结
优化:
- I/O 多路复用,循环调用 selector.select(timeoutMillis) 阻塞线程,监听 I/O 事件,如果有 Eventloop 任务需要执行,就唤醒线程执行任务
- 通过空轮询计数 + 重建 Selector 的方式避免 JDK 空轮询 Bug
- 监听到 I/O 事件,使用创建 Selector 时反射替换 selectedKeys 的数据结构优化遍历事件集合,提高效率
- 通过二进制掩码筛选事件类型
- 使用 RecvByteBufAllocator 计算预测下次申请
byteBuf的容量大小提高效率
流程:
- Netty 是通过 BossGroup 的 EventLoop 线程不断循环调用 select(timeoutMillis) 阻塞线程监听 I/O 事件,如果有 EventLoop 任务需要执行,就唤醒线程执行任务
- 当有新连接建立时,server 端会监听到 OP_ACCPET 事件的 key,然后通过 key 获取到对应的 IoRegistration 通过 IoHandle 执行 AbstractNioMessageChannel 的 read() 方法处理消息
- 通过把新连接当作消息,创建一个 JDK 原生的 Socket 连接封装到 NioSocketChannel 指定 OP_READ 操作数,交给 ChannelPipeline 当作消息传递 read 入站事件
- 通过轮询的负载均衡方式选择 ChildGroup 的一个 Worker 线程执行注册 NioSocketChannel
- 在 Worker 线程中传递 read 出站事件,完成注册 OP_READ 事件的逻辑