欢迎访问shiker.tech

请允许在我们的网站上展示广告

您似乎使用了广告拦截器,请关闭广告拦截器。我们的网站依靠广告获取资金。

Netty如何创建和管理事件循环?
(last modified Dec 28, 2024, 12:40 AM )
by
侧边栏壁纸
  • 累计撰写 194 篇文章
  • 累计创建 66 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

Netty如何创建和管理事件循环?

橙序员
2023-08-13 / 0 评论 / 0 点赞 / 485 阅读 / 1,825 字 / 正在检测百度是否收录... 正在检测必应是否收录...
文章摘要(AI生成)

使用EventLoopGroup创建事件循环组时,会先创建MultithreadEventExecutorGroup和SingleThreadEventExecutor两个线程池。MultithreadEventExecutorGroup会创建事件组线程池和事件循环数组,并通过选择器选择下一个事件循环。SingleThreadEventExecutor会创建事件线程池、执行事件循环,并维护任务队列。单线程事件线程池通过execute方法确保只有一个线程在执行任务。事件循环主要是SingleThreadEventExecutor的子类,实现了run方法。通过类关系图,了解了事件循环组的创建流程和内部实现。

创建事件循环组

当我们使用EventLoopGroup parentGroup = new NioEventLoopGroup();这样的代码创建事件循环组时,其创建的流程是这样的:

image-20230813142507259

结合我们在上篇文章中事件循环相关的类关系图:

image-20230813160808713

其中我们主要关心两个线程池:MultithreadEventExecutorGroupSingleThreadEventExecutor

多线程事件线程池组

多线程事件循环组MultithreadEventExecutorGroup在构造时创建下面这几个对象:

  1. 创建事件组线程池,负责使用线程工厂new ThreadPerTaskExecutor(newDefaultThreadFactory());创建和执行线程

    这个线程池中的线程为非守护线程,线程名称有统一前缀,命名统一为StringUtil.simpleClassName(getClass())-poolId.incrementAndGet()-nextId.incrementAndGet(),即在当前事件循环组中, 线程命名为nioEventLoopGroup-1-1,线程优先级为5

  2. 创建事件循环数组,存放NioEventLoop集合。这个集合中每个元素都是调用newChild方法创建的,其中NioEventLoopGroup对此的实现为:

        @Override
        protected EventLoop newChild(Executor executor, Object... args) throws Exception {
            return new NioEventLoop(this, executor, (SelectorProvider) args[0],
                ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
        }
    
  3. 创建对事件循环数组的选择器,其next()方法负责对事件循环数组进行轮询,选择器根据数组大小是否为2的整数次幂有两种实现

    1. 如果是2的整数次幂,则next()方法通过公式executors[idx.getAndIncrement() & executors.length - 1]获取下一个事件循环(对应选择器PowerOfTwoEventExecutorChooser
    2. 如果不是2的整数次幂,则next()方法通过公式 executors[Math.abs(idx.getAndIncrement() % executors.length)]获取下一个事件循环(对应选择器GenericEventExecutorChooser

此时事件线程池组的创建还是由用户发起的线程所操作的,如果我们是main函数调用,则是main主线程。

多线程事件线程池组的线程执行

在多线程事件循环组MultithreadEventExecutorGroup的父类AbstractEventExecutorGroup中,我们可以找到其重写的execute方法:

    @Override
    public void execute(Runnable command) {
        next().execute(command);
    }

可以看到事件线程池组的线程被调用时,会逐次使用下一个事件循环的执行方法进行执行

单线程事件线程池

单线程事件线程池SingleThreadEventExecutor在构造时创建的对象如下:

  1. 创建事件线程池,负责使用ThreadExecutorMap.apply(executor, this)创建和执行事件循环,其apply方法主要创建线程池来执行我们事件循环组中的线程池中的线程:

    new Executor() {  // 创建一个匿名内部类实例
        @Override
        public void execute(final Runnable command) {
            // 调用总executor(group所封装的executor)的execute()
            executor.execute(apply(command, eventExecutor));
        }
        
        public static Runnable apply(final Runnable command, final EventExecutor eventExecutor) {
            ObjectUtil.checkNotNull(command, "command");
            ObjectUtil.checkNotNull(eventExecutor, "eventExecutor");
            return new Runnable() {
                @Override
                public void run() {
                    setCurrentEventExecutor(eventExecutor);
                    try {
                        command.run();
                    } finally {
                        setCurrentEventExecutor(null);
                    }
                }
            };
        }
    }
    
  2. 创建任务队列,创建一个默认大小为16的LinkedBlockingQueue<Runnable>(maxPendingTasks)任务队列

如何做到单线程执行?

我们从上面构造可以看到单线程事件线程池也可以创建多个线程,那么这个单线程事件线程池是如何做到线程池只有一个线程存活在执行呢?

首先我们需要看下这个单线程池的execute方法:

image-20230813174721039

这个方法中,首先会调用inEventLoop方法判断事件循环所绑定的线程是否与当前线程是否一致,如果不一致则会创建一个新的线程。当我们第一次调用execute方法时,事件循环还没有绑定线程,所以肯定是不一致的。然后当前线程池未启动,那么这时会调用SingleThreadEventExecutor中的executorexecute执行方法,这个执行方法又会调用我们MultithreadEventExecutorGroup中使用ThreadPerTaskExecutor创建的execurorexecute执行方法,最终通过线程工厂创建一个名类似为nioEventLoopGroup-2-1的新线程。

创建完成后,我们就会将单线程池的状态更新为创建完成,所以当单线程池的execute再次被调用时,就会发现eventLoop一直在使用第一次创建的nioEventLoopGroup-2-1线程,而不会重新创建线程。

事件循环

由类关系图可知,事件循环主要是对SingleThreadEventExecutor的继承,它主要实现了run方法。这里面包括了对网络通道的轮询和任务队列的处理。

NioEventLoop中,主要包含了这几个元素:

  1. selector:对NIO的轮询器封装,由于NIO中的selectionKey使用set集合存储,为了优化存储空间,封装后的selector使用数组存储
  2. unwrappedSelector:NIO的轮询器

而在其重新run方法中,流程如下:

image-20230813181226031

其次调用轮询器的select方法执行轮询,轮询后对于由数据变更的网络通道会先判断管道状态:

  • 获取管道的就绪状态,根据不同状态执行不同操作。
  • 如果为连接就绪,则调用finishConnect方法确认连接是否已完成
  • 如果为写就绪,则调用forceFlush强制刷新管道缓冲区
  • 如果为读就绪、接收就绪、或者初始化,则调用read方法进行数据读取

然后再执行任务队列中的任务。一次循环中执行轮询的时间和执行任务队列的时间比例可以控制,默认是各占50%。

处理空轮询的bug

但是我们透过源码发现这个流程并不简单:

 private void select(boolean oldWakenUp) throws IOException {
        Selector selector = this.selector;
        try {
            int selectCnt = 0;
            long currentTimeNanos = System.nanoTime();
            long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);

            for (;;) {
                if (timeoutMillis <= 0) {
                    // selectCnt为0,表示当前还没有任何就绪channel被选择
                    if (selectCnt == 0) {
                        // 非阻塞选择
                        selector.selectNow();
                        selectCnt = 1;
                    }
                    break;
                }
                int selectedKeys = selector.select(timeoutMillis);
                selectCnt ++;

                if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                    break;
                }
                if (Thread.interrupted()) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely because " +
                                "Thread.currentThread().interrupt() was called. Use " +
                                "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop.");
                    }
                    selectCnt = 1;
                    break;
                }
                if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
                    selectCnt = 1;
                } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
                        selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
                    // The code exists in an extra method to ensure the method is not too big to inline as this
                    // branch is not very likely to get hit very frequently.
                    selector = selectRebuildSelector(selectCnt);
                    selectCnt = 1;
                    break;
                }
                currentTimeNanos = time;
            }  // end-for

            if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) {
                if (logger.isDebugEnabled()) {
                    logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                            selectCnt - 1, selector);
                }
            }
        } catch (CancelledKeyException e) {
            if (logger.isDebugEnabled()) {
                logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                        selector, e);
            }
            // Harmless exception - log anyway
        }
    }

此处selectRebuildSelector()的作用是重建一个轮询器,而重建论续器的条件为:

先判断轮询执行时间是否是在我们指定的轮询超时时间结束,如果在指定轮询时间之前结束。且当轮询到指定次数(默认512)便会重新创建一个新的selector。

对于多次轮询未在指定事件内提前结束的异常现象,这是由于NIO(底层使用linux的epoll命令) 的BUG导致:“若Selector的轮询结果为空,也没有wakeup或新消息处理,则发生空轮询,CPU使用率100%。”

0

评论区