当前位置:首页 > Web开发 > 正文

从Netty EventLoop实现上可以学到什么

2024-03-31 Web开发

本文主要讨论Netty NioEventLoop原理及实践,关于Netty NioEventLoop,首先要知道NioEventLoop是什么,为什么它会是Netty核心Reactor处理器,实现原理是什么,进而再讨论Netty对其的实现及使用上我们可以学到哪些。

EventLoop是一个Reactor模型的事件处理器,一个EventLoop对应一个线程,其内部会维护一个selector和taskQueue,负责处理客户端请求和内部任务,内部任务如ServerSocketChannel注册、ServerSocket绑定和延时任务处理等操作。

EventLoop是由事件驱动的,比如IO事件和任务等,IO事件即selectionKey中ready的事件,如accept、connect、read、write等,由processSelectedKeys方法触发。处理完请求时间之后,会处理内部添加到taskQueue中的任务,如register0、bind0等任务,由runAllTasks方法触发。注意NioEventLoop在Linux中默认底层是基于epoll机制。

技术图片

上图是EventLoop的核心流程图,如果从Netty整体视角看EventLoop的事件流转,下图来的更直观:

技术图片

注意:bossGroup和WorkerGroup中的NioEventLoop流程是一致的,只不过前者处理Accept事件之后将连接注册到后者,由后者处理该连接上后续的读写事件。

大致了解了NioEventLoop之后,不知道有没有小伙伴有这样的疑问,为什么Netty要这样实现呢,这种实现方案对于我们后续开发如何借鉴呢?关于这些疑问,本文最后讨论哈 :)

EventLoop实现原理

EventLoop是一个Reactor模型的事件处理器,一个EventLoop对应一个线程,其内部会维护一个selector和taskQueue,负责处理IO事件和内部任务。IO事件和内部任务执行时间百分比通过ioRatio来调节,ioRatio表示执行IO时间所占百分比。任务包括普通任务和已经到时的延迟任务,延迟任务存放到一个优先级队列PriorityQueue中,执行任务前从PriorityQueue读取所有到时的task,然后添加到taskQueue中,最后统一执行task。

事件处理机制

EventLoop是由事件驱动的,比如IO事件即selectionKey中ready的事件,如accept、connect、read、write等,处理的核心逻辑主要是在NioEventLoop.run方法中,,流程如下:

protected void run() { for (;;) { /* 如果hasTasks,则调用selector.selectNow(),非阻塞方式获取channel事件,没有channel事件时可能返回为0。这里用非阻塞方式是为了尽快获取连接事件,然后处理连接事件和内部任务。*/ switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); if (wakenUp.get()) { selector.wakeup(); } default: } cancelledKeys = 0; needsToSelectAgain = false; /* ioRatio调节连接事件和内部任务执行事件百分比 * ioRatio越大,连接事件处理占用百分比越大 */ final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { processSelectedKeys(); } finally { runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } }

从代码上,在执行select()前有一个hasTasks()的操作,这个hasTasks()方法判断当前taskQueue是否有元素。如果taskQueue中有元素,执行 selectNow() 方法,最终执行selector.selectNow(),该方法会立即返回,保证了EventLoop在有任务执行时不会因为IO事件迟迟不来造成延后处理,这里优先处理IO事件,然后再处理任务。

如果当前taskQueue没有任务时,就会执行select(wakenUp.getAndSet(false))方法,代码如下:

/* 这个方法解决了Nio中臭名昭著的bug:selector的select方法导致空轮询 cpu100% */ private void select(boolean oldWakenUp) throws IOException { Selector selector = this.selector; try { int selectCnt = 0; long currentTimeNanos = System.nanoTime(); /* delayNanos(currentTimeNanos):计算延迟任务队列中第一个任务的到期执行时间(即最晚还能延迟多长时间执行),默认返回1s。每个SingleThreadEventExecutor都持有一个延迟执行任务的优先队列PriorityQueue,启动线程时,往队列中加入一个任务。*/ long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); for (;;) { /* 如果延迟任务队列中第一个任务的最晚还能延迟执行的时间小于500000纳秒,且selectCnt == 0(selectCnt 用来记录selector.select方法的执行次数和标识是否执行过selector.selectNow()),则执行selector.selectNow()方法并立即返回。*/ long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L; if (timeoutMillis <= 0) { if (selectCnt == 0) { selector.selectNow(); selectCnt = 1; } break; } if (hasTasks() && wakenUp.compareAndSet(false, true)) { selector.selectNow(); selectCnt = 1; break; } // 超时阻塞select int selectedKeys = selector.select(timeoutMillis); selectCnt ++; System.out.println(selectCnt); // 有事件到来 | 被唤醒 | 有内部任务 | 有定时任务时,会返回 if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { break; } long time = System.nanoTime(); if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { // 阻塞超时后没有事件到来,重置selectCnt selectCnt = 1; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // Selector重建 rebuildSelector(); selector = this.selector; // Select again to populate selectedKeys. selector.selectNow(); selectCnt = 1; break; } currentTimeNanos = time; } } catch (CancelledKeyException e) { // Harmless exception - log anyway } }

当java NIO bug触发时,进行Selector重建,rebuildSelector过程如下:

通过方法openSelector创建一个新的selector。

将old selector的selectionKey执行cancel。

将old selector的channel重新注册到新的selector中。

Netty的连接处理就是IO事件的处理,IO事件包括读事件、ACCEPT事件、写事件和OP_CONNECT事件:

ACCEPT事件:连接建立好之后将该连接的channel注册到workGroup中某个NIOEventLoop的selector中;

READ事件:从channel中读取数据,存放到byteBuf中,触发后续的ChannelHandler来处理数据;

温馨提示: 本文由Jm博客推荐,转载请保留链接: https://www.jmwww.net/file/web/42023.html