netty(4.1.6.Final)
2022-01-12 17:36:04 9 举报
AI智能生成
netty源码学习笔记
作者其他创作
大纲/内容
netty工作原理图
服务器端启动<br>
1. 实例化一个NioEventLoopGroup对象,最终会调用的父类MultithreadEventExecutorGroup构造<br> 该对象中包含一组NioEventLoop,默认个数为cup核数*2<br> 每个NioEventLoop包含一个任务队列,一个nio选择器和线程池(该线程池中只包一个线程,这样就不用处理多线程竞争的问题)<br> 在服务端启动时会从bossGroup中取出一个NioEventLoop并往NioEventLoop的选择器中注册accept事件<br> 然后启动NioEventLoop线程池中的那个线程监听accept事件<br> <font color="#0076b3">EventLoopGroup bossGroup = new NioEventLoopGroup();</font><br>
1. 如果nThreads小于0,则抛出异常,默认没有指定该值的时候为处理器数量的两倍<br>
2. 如果executor没有指定,则使用默认的线程池ThreadPerTaskExecutor,默认工厂为DefaultThreadFactory<br> <font color="#0076b3"> executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());</font><br>
3. 实例化一个nThread大小的EventExecutor数组<br> <font color="#0076b3">children = new EventExecutor[nThreads];</font><br>
4. 循序调用NioEventLoopGroup#newChild方法创建NioEventLoop对象为children数组赋值<br> 其中args分别为WindowsSelectorProvider,DefaultSelectStrategyFactor,RejectedExecutionHandlers<br> <font color="#0076b3"> children[i] = newChild(executor, args);</font><br>
1. 调用父类SingleThreadEventLoop构造DEFAULT_MAX_PENDING_TASKS为Integer.MAX_VALUE<br> <font color="#0076b3"> super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);</font><br>
1. 继续调用父类的SingleThreadEventExecutor的构造<br> <font color="#0076b3">super(parent);</font>
1. 调用父类AbstractScheduledEventExecutor的构造<br><font color="#0076b3"> super(parent);</font>
1. 调用父类AbstractEventExecutor构造设置parent属性为传入的EventExecutorGroup<br><font color="#0076b3"> this.parent = parent;</font><br>
2. 设置addTaskWakesUp为false<br> <font color="#0076b3">this.addTaskWakesUp = addTaskWakesUp;</font><br>
3. 设置最大等待任务数为Integer.MAX_VALUE<br> <font color="#0076b3"> this.maxPendingTasks = Math.max(16, maxPendingTasks);</font>
4. 检查线程池对象是否为空,为空抛出NullPointerException异常<br> <font color="#0076b3"> this.executor = ObjectUtil.checkNotNull(executor, "executor");</font>
5. 实例化任务队列赋值给taskQueue <br> <font color="#0076b3">taskQueue = newTaskQueue(this.maxPendingTasks);</font><br>
1. 调用PlatformDependent.newMpscQueue方法创建任务队列并返回<br> 在SingleThreadEventExecutor#newTashkQueue方法中是以LinkedBlockingQueue作为任务队列<br> 但子类NioEventLoop重写了该方法,最终返回的是MpscChunkedArrayQueue(即多生产者单消费者队列)<br> 关于jctools中的队列后续再研究<br><font color="#0076b3"> return PlatformDependent.newMpscQueue(maxPendingTasks);</font><br>
6. 检查拒绝策略是否为空,为空抛出NullPointerException异常<br> <font color="#0076b3">rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");</font>
2. 和./1.5一样,实例化一个MpscChunkedArrayQueue赋值给tailTasks<br> taskQueue的优先级高于tailTasks和定时任务,定时任务优先级高于tailTasks<br> <font color="#0076b3">tailTasks = newTaskQueue(maxPendingTasks);</font><br>
3. <font color="#0076b3">provider = selectorProvider;</font>
4. 这一步主要是通过./3中的SelectorProvider#openSelector()打开一个选择器赋值给selector属性,和nio中一样<br> <font color="#0076b3">selector = openSelector();</font><br>
5. <font color="#0076b3">selectStrategy = strategy;</font>
5. 通过传入的事件池选择工厂创建事件池选择器,选择器中有个next方法用来获取分配的NioEventLoop<br> <font color="#0076b3"> chooser = chooserFactory.newChooser(children);</font><br>
1. 如果children数量为偶数,则创建PowerOfTowEventExecutorChooser事件池选择器<br> 选择线程池的逻辑为(&运算比%运算要快,所以尽量将线程数量指定为偶数):<br> <font color="#0076b3"> executors[idx.getAndIncrement() & executors.length - 1];</font>
2. 如果children数量为奇数,则创建GenericEventExecutorChooser事件池选择器<br> 选择线程池的逻辑为:<br><font color="#0076b3"> executors[Math.abs(idx.getAndIncrement() % executors.length)];</font><br>
6. 创建一个NioEventLoop关闭监听器terminationListener <br>
7. 为每一个NioEventLoop添加./6中的关闭监听器。<br> <font color="#0076b3">e.terminationFuture().addListener(terminationListener);</font><br>
8. 将所有的NioEventLoop添加到一个 LinkedHashSet属性readonlyChildren 中<br>
2. 和./1中一样,创建一个NioEventLoopGroup对象<br><font color="#0076b3"> </font>在客户端connect服务器端时,会触发服务器端的accept监听,服务器端会创建一个NioSocketChannel对象<br> 然后从workerGroup中取出一个NioEventLoop和该NioSocketChannel对象进行绑定,同一个NioEventLoop中可能会绑定多个NioSocketChannel对象<br> 当NioEventLoop中的选择器有新的read事件到来时会通知相应的NioSocketChannel<font color="#0076b3"><br> EventLoopGroup workerGroup = new NioEventLoopGroup();</font><br>
3. 实例化一个ServerBootstrap对象<br> <font color="#0076b3">ServerBootstrap b = new ServerBootstrap();</font><br>
4. 将./1中的bossGroup赋值给ServerBootstrap#group属性,将./2 workerGroup赋值给ServerBootstrap#childGroup属性<br> <font color="#0076b3">b.group(bossGroup, workerGroup)</font><br>
5. 通过传入的class构建一个ReflectiveChannelFactory赋值给ServerBootstrap#channelFactory属性用于后续实例化NioServerSocketChannel<br><font color="#0076b3"> b.channel(NioServerSocketChannel.class)</font><br>
6. 调用option方法设置一些TCP参数到一个LinkedHashMap属性options中<br> <font color="#0076b3">b.option(ChannelOption.SO_BACKLOG, 100)</font><br>
7. 设置ServerSocketChannel的handler<br> <font color="#0076b3">b.handler(new LoggingHandler(LogLevel.INFO))</font><br>
8. 设置SocketChannel 的handler,EchoServerHandler为自定义的一个handler<font color="#0076b3"><br>b.childHandler(new ChannelInitializer<SocketChannel>() {<br> @Override<br> public void initChannel(SocketChannel ch) {<br> ChannelPipeline p = ch.pipeline();<br> p.addLast(new EchoServerHandler());<br> }<br>});</font><br>
9. 将端口实例化为InetSocketAddress对象传入bind方法进行调用<br> <font color="#0076b3"> ChannelFuture f = b.bind(9999).sync();</font><br>
1. 做一些校验工作,如group,channelFactory,childHandler这些值不能为空<br><font color="#0076b3"> validate();</font><br>
2. 调用doBind方法<br> <font color="#0076b3">return doBind(localAddress);</font><br>
1. 初始化并注册ServerSocketChannel,并返回一个DefaultChannelPromise对象<br> 因为在初始化的时候会将一些任务交给一个EventLoop线程异步执行<br> 所以需要返回Future对象进行判断任务执行的情况<br> <font color="#0076b3">final ChannelFuture regFuture = initAndRegister();</font><br>
1. 通过#5中设置的channelFactory创建NioServerSocketChannel实例<br> 会调用NioServerSocketChannel的无参构造<br> <font color="#0076b3"> channel = channelFactory.newChannel();</font><br>
1. 调用父类AbstractNioMessageChannel的构造方法<br> <font color="#0076b3">super(null, channel, SelectionKey.OP_ACCEPT);</font><br>
1. 调用父类AbstractNioChannel的构造<br> <font color="#0076b3">super(parent, ch, readInterestOp);</font><br>
1. 调用父类AbstractChannel的构造<br> <font color="#0076b3"> super(parent);</font>
1. 设置父channel,此处为null<br> <font color="#0076b3">this.parent = parent;</font><br>
2. 通过DefaultChannelId工厂类创建channelID<br> <font color="#0076b3">id = newId();</font><br>
3. 调用AbstractNioMessageChannel#newUnsafe方法实例化一个NioMessageUnsafe对象赋值给unsafe属性<br> <font color="#0076b3">unsafe = newUnsafe();</font><br>
4. 调用newChannelPipeline方法实例化DefaultChannelPipeline对象赋值给pipeline<br> <font color="#0076b3">pipeline = newChannelPipeline();</font><br>
2. 将创建的ServerSocketChannel赋值给ch属性<br> <font color="#0076b3">this.ch = ch;</font><br>
3. 将事件类型存储在readInterestOp属性,此处是OP_ACCEPT事件<br> <font color="#0076b3">this.readInterestOp = readInterestOp;</font><br>
4. 设置ServerSocketChannel为非阻塞模式<br> <font color="#0076b3">ch.configureBlocking(false);</font><br>
2. 实例化NioServerSocketChannelConfig对象赋值给config<br> <font color="#0076b3">config = new NioServerSocketChannelConfig(this, javaChannel().socket());</font><br>
2. 对NioServerSocketChannel执行一些初始化操作<br> <font color="#0076b3"> init(channel);</font><br>
1. 将#6中设置的选项设置到NioServerSocketChannelConfig中<br> <font color="#0076b3">channel.config().setOptions(options);</font>
2. 如果有设置属性,将设置的属性循环添加到NioServerSocketChannel中<br> <font color="#0076b3"> channel.attr(key).set(e.getValue());</font>
3. 获取NioServerSocketChannel中的ChannelPipeline(../1.1.1.1.4有设置)<br> <font color="#0076b3">ChannelPipeline p = channel.pipeline();</font>
4. 将childGroup,childHandler,childOptions,childAttrs四个变量的取出,供下面addLast使用<br>
5. 调用DefaultChannelPipeline#addLast方法将一个ChannelInitializer对象添加到链表中<br> <b> 关于ChannelInitializer:</b><br> ChannelInitializer是继承自ChannelInboundHandlerAdapter的抽象类,类中有定义一个抽象initChannel方法<br> 在执行ChannelInitializer#handlerAdded时会调用initChannel(ChannelHandlerContext ctx)方法,该方法中<br> 会调用抽象方法initChannel,然后在执行完该方法后会将ChannelInitializer从DefaultChannelPipeline链表中移除
1. 对handler做些检查,如是否在没有@Sharable注解的情况下重复注册<br> <font color="#0076b3">checkMultiplicity(handler);</font><br>
2. 调用newContext方法将handler包装为一个DefaultChannelHandlerContext对象<br> <font color="#0076b3">newCtx = newContext(group, filterName(name, handler), handler);</font><br>
3. 调用addLast0方法将newCtx添加到DefaultChannelPipeline链表中<br> 在实例化DefaultChannelPipeline时就初始化好了head和tail<br> <font color="#0076b3">addLast0(newCtx);</font><br>
添加后的链表结构
如果DefaultChannelPipeline#registered为false<br>(此处满足条件,在#../../3.1.1.1.6.4.3处回调的时候才会设置为true)<br>
4. 设置DefaultChannelHandlerContext对象状态为ADD_PENDING<font color="#0076b3"><br> newCtx.setAddPending();</font><br>
5. 设置事件回调<font color="#0076b3"><br> callHandlerCallbackLater(newCtx, true);</font><br>
1. 此处added为true,会实例化一个PendingHandlerAddedTask赋值给task,该Task主要是执行callHandlerAdded0(在#9.2.1.3.1.1.1.6.4.3.3处进行回调)<br> <font color="#0076b3"> PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);</font>
2. 如果pendingHandlerCallbackHead属性值为null则将task赋值给pendingHandlerCallbackHead<br>
3. 否则将task作为pendingHandlerCallbackHead的尾结点,PendingHandlerCallback本身是一个单向链表<br>
6. 返回当前DefaultChannelPipeline<br> <font color="#0076b3"> return this;</font>
如果DefaultChannelPipeline#registered为true<br>
会立即调用callHandlerAdded0方法,即立即调用handler#handlerAdded方法<br>也就是说handlerAdded方法的触发要等NioServerSocketChannel注册到选中器完成后
3. 调用#1 中创建的bossGroup对象的register方法<br> <font color="#0076b3"> ChannelFuture regFuture = config().group().register(channel);</font>
1. 通过#1.5创建的chooser对象的next方法从事件池数组中<br> 取出第一个NioEventLoop对象调用register方法<br> <font color="#0076b3">return next().register(channel)</font><br>
1. 先实例化一个DefaultChannelPromise对象<br> 然后调用父类SingleThreadEventLoop#register<br> <font color="#0076b3">return register(new DefaultChannelPromise(channel, this));</font><br>
1. 最终会调用#9.2.1.1.1.1.1.3中创建的NioMessageUnsafe#register方法<font color="#0076b3"><br> promise.channel().unsafe().register(this, promise);</font><br>
1. eventLoop不能为空,即前面传入进来的this<br>
2. NioServerSocketChannel#registered为true则设置异常到promise中并返回<br> <font color="#0076b3">promise.setFailure(new IllegalStateException("..."));</font>
3. 如果eventLoop不为NioEventLoop类型则设置异常到promise中并返回<br><font color="#0076b3"> promise.setFailure(new IllegalStateException("..."));</font>
4. 将传入的eventLoop赋值给NioServerSocketChannel#eventLoop属性<br> <font color="#0076b3">AbstractChannel.this.eventLoop = eventLoop;</font><br>
5. 如果当前线程就是eventLoop线程池中的那个线程(此处不满足该条件,因为eventLoop中的线程要在./6.3.1中才启动),则由当前线程立即执行register0方法<br> 否则进入./6通过eventLoop#execute执行register0方法<br> <font color="#0076b3"> register0(promise);</font><br>
6. 通过eventLoop.execute执行register0()方法<br> <font color="#0076b3"> eventLoop.execute(new Runnable() {<br> public void run() {<br> register0(promise);<br> }<br> });</font>
1. 判断当前线程是否为事件池中的那个线程<br><font color="#0076b3"> boolean inEventLoop = inEventLoop();</font>
如果inEventLoop为true(不满足)
2. 则将执行register0方法的任务添加到eventLoop的任务队列中(#1.4.1.5中创建的那个任务队列)<br> <font color="#0076b3"> addTask(task);</font>
如果inEventLoop为false(满足)
3. 启动线程<br> <font color="#0076b3">startThread();</font><br>
如果state为ST_NOT_STARTED状态且修改成ST_STARTED状态成功<br>(state值由原子类进行维护,有五个值:<br> ST_NOT_STARTED = 1; //线程未启动,为初始值<br> ST_STARTED = 2; //线程已启动<br> ST_SHUTTING_DOWN = 3; //线程停止中<br> ST_SHUTDOWN = 4; //线程已停止<br> ST_TERMINATED = 5; //线程已死亡)
1. 该方法会通过eventLoop中的单线程池对象executor的execute方法执行一个Runnable任务<br> <font color="#0076b3"> doStartThread();</font><br>
1. 将eventLoop线程池中的那个线程保存到eventLoop的thread属性中<br> inEventLoop方法中就是判断这个thread线程是否等于当前线程<br> <font color="#0076b3"> thread = Thread.currentThread();</font><br>
2. 如果interrupted属性值为true,则中断当前线程
3. 执行EventLoop#run方法,该run方法中是一个死循环<br> 会重复的干着右侧1,4,5三件事情<br> <font color="#0076b3">SingleThreadEventExecutor.this.run();</font><br>
1. 将selectStrategy.calculateStrategy返回值作为switch的条件执行不同的操作<br> 主要是判断taskQueue或tailTasks中是否有任务<br> 有任务的话将执行selectNow()返回0进入switch的defalut(什么也不干)<br> 没有的话会返回-1<br>
1. 值为-2<br>
跳出循环
2. 值为-1<br>
执行select方法,该方法中又是一个循环,重复着右侧的操作<br><font color="#0076b3">select(wakenUp.getAndSet(false));</font><br>
1. 定时任务执行时间要到了,退出循环。<br> 通过下一个定时任务时间执行时间来计算select的阻塞时间,保证定时任务能及时执行<br> 在没有定时任务的情况下默认为下一个任务将在1S后执行,所以默认的select的阻塞时间为1s<br>
2. 如果任务队列中有新的任务进来了,退出循环
3. 阻塞1s中,./1中有计算阻塞时间<br> <font color="#0076b3"> int selectedKeys = selector.select(timeoutMillis);</font><br>
4. selectCnt值加1,该值用来计算selector阻塞次数<br> <font color="#0076b3"> selectCnt ++;</font><br>
5. 符合右侧中的一个条件,退出循环<br>
1. ./3中的返回值不为0,说明有事件就绪<br><font color="#0076b3"> selectedKeys != 0</font><br>
2. oldWakenUp 参数为true<br>
3. 用户主动唤醒<br> <font color="#0076b3">wakenUp.get()</font>
4. 任务队列里面有任务<br><span style="font-size: inherit;"> <font color="#0076b3"> hasScheduledTasks()</font></span><br>
5. 第一个定时任务即将要被执行<br> <font color="#0076b3"> hasScheduledTasks()</font><br>
6. 线程被中断,退出循序
7. 解决jdk的nio bug,<br> <font color="#0076b3">rebuildSelector();</font><br>
bug描述
poll和epoll对于突然中断的连接socket会对返回的eventSet事件集合置为POLLHUP或者POLLERR,eventSet事件集合发生了变化,这就导致Selector会被唤醒,进而导致CPU 100%问题。根本原因就是JDK没有处理好这种情况,比如SelectionKey中就没定义有异常事件的类型。
netty解决
通过判断每次select操作是否至少持续了timeoutMillis秒,有的话重置./4的selectCnt值<br>如果selectCnt的值超过阀值(默认512),则说明连续多次select少于timeoutMillis秒,可能是出现空轮询了<br>就会将之前注册到老的selector上的的channel重新转移到新的selector上<br>
3. default<br>
什么也不干
如果this.ioRatio==100,表示执行runAllTasks()没有最大时间限制
2. <font color="#0076b3">processSelectedKeys(); </font><br>
3.<font color="#0076b3"> runAllTasks(); </font><br>
如果this.ioRatio!=100(默认为50)<br>
4. 该方法中会根据selectedKeys的值是否为null判断是否调用processSelectedKeysOptimized<br> 在实例化NioEventLoop的时候是有初始化selectedKeys的,所以会调用processSelectedKeysOptimized<br> 在processSelectedKeysOptimized方法中会迭代 selectedKeys 获取就绪的 IO 事件<br> 然后为每个事件都调用 processSelectedKey 来处理它<br> <font color="#0076b3">processSelectedKeys();</font><br>
1. 循环调用processSelectedKey方法进行处理就绪事件<br> 各个事件的触发时机请查看"NIO的SelectionKey类型部分"<br><font color="#0076b3"> processSelectedKey(k, (AbstractNioChannel) a);</font><br>
1. 处理OP_CONNECT事件<br> <font color="#0076b3"> unsafe.finishConnect();</font>
2. 处理OP_WRITE事件<br> <font color="#0076b3">ch.unsafe().forceFlush();</font>
3. 处理OP_ACCEPT 和 OP_READ事件<br> 不同的对象unsafe对象值是不一样的,accept时是NioMessageUnsafe,read时是NioByteUnsafe<br> <font color="#0076b3"> unsafe.read();</font><br>
OP_ACCEPT
在该方法中会创建NioSocketChannel对象,然后将该对象作为入参调用DefaultChannelPipeline链中的channelRead方法<br>最终会调用到ServerBootstrapAcceptor#channelRead,进行NioSocketChannel的事件注册操作<br>
OP_READ
5. <font color="#0076b3">runAllTasks(ioTime * (100 - ioRatio) / ioRatio);</font><br>
1. 从scheduledTaskQueue转移定时任务到taskQueue<br> 每次 pollScheduledTask 的时候,只有在当前任务的执行时间已经到了,才会取出来<br> <font color="#0076b3">fetchFromScheduledTaskQueue();</font><br>
2. 取出队列中的第一个Runnable任务<br> <font color="#0076b3"> Runnable task = pollTask();</font>
3. 如果没有任务则执行完afterRunningAllTasks方法就return了<br> afterRunningAllTasks方法中主要是循环执行tailTasks队列中的任务<br><font color="#0076b3"> afterRunningAllTasks();</font><br>
NioEventLoop可以通过executeAfterEventLoopIteration方法向tailTasks中添加收尾任务<br>比如,你想统计一下一次执行一次任务循环花了多长时间就可以调用此方法<br>
4. taskQueue中有任务的话会循序调用safeExecute方法执行taskQueue队列中的任务<br> safeExecute方法中会调用任务的run方法进行执行任务逻辑<br> 每执行64个任务判断一下是不是到传入的最大执行时间了,到了的话就跳出循环<br> <font color="#0076b3"> safeExecute(task);</font><br>
5. 和./3中的afterRunningAllTasks方法逻辑一致<br> <font color="#0076b3">afterRunningAllTasks();</font>
4. 和./2中一样,将执行register0方法的任务添加到taskQueue任务队列中<br> 这个任务最终会在./3.1.3.5.4处执行<br> <font color="#0076b3"> addTask(task);</font><br>
1. 该方法中主要将ServerSocketChannel注册到selector上,只不过注册的事件值为0,表示对任何事件都不感兴趣<br> 同时在注册的时候会将NioServerSocketChannel对象作为attach参数传了过去供后面的事件处理使用<br> <font color="#0076b3">doRegister();</font><br>
2. 设置NioServerSocketChannel#registered为true<br> <font color="#0076b3"> registered = true;</font>
3. 调用DefaultChannelPipeline#invokeHandlerAddedIfNeeded<br> 该方法主要是用来在ServerSocketChannel注册到selector上后执行<br> DefaultChannelPipeline#pendingHandlerCallbackHead回调用的,只会调用一次<br> <font color="#0076b3"> pipeline.invokeHandlerAddedIfNeeded();</font><br>
1. 将DefaultChannelPipeline#registered设置为true<br> <font color="#0076b3"> registered = true;</font><br>
2. 将单向链表DefaultChannelPipeline#pendingHandlerCallbackHead存在临时变量task <br> 然后设置DefaultChannelPipeline#pendingHandlerCallbackHead为null<br> <font color="#0076b3"> this.pendingHandlerCallbackHead = null;</font><br>
3. 循环调用单向链表节点的execute方法,此处链表中应该只有一个节点<br> 就是PendingHandlerAddedTask,所以会调用PendingHandlerAddedTask#execute<br> <font color="#0076b3">while (task != null) {<br> task.execute();<br> task = task.next;<br> }</font><br>
1. 取出NioServerSocketChannel#eventLoop,在#9.2.1.3.1.1.4有设置<br> EventExecutor executor = ctx.executor();<br>
如果当前线程是executor中的那个单线程(此处满足)<br>
2. 直接执行callHandlerAdded0方法<br> <font color="#0076b3">callHandlerAdded0(ctx);</font><br>
1. 该方法最终会通过<font color="#0076b3">initChannel((C) ctx.channel());</font>方式<br> 调用#9.2.1.2.5中addLast的那个ChannelInitializer#initChannel方法<br> 此处再次贴出那块的代码进行分析下<br> <font color="#0076b3"> ctx.handler().handlerAdded(ctx);</font><br>
1. 从NioServerSocketChannel取出pipeline,即DefaultChannelPipeline<br> <font color="#0076b3">final ChannelPipeline pipeline = ch.pipeline();</font>
2. ServerBootstrapConfig中取出设置的handler(即#7中设置的LoggingHandler)<br> 通过pipeline.addLast(handler);添加到DefaultChannelPipeline链表尾结点的前一节点<br> <font color="#0076b3">ChannelHandler handler = config.handler();</font><br>
3. 向eventLoop中添加了一个任务,该任务就是向pipeLine链表中addLast一个ServerBootstrapAcceptor对象<br> ServerBootstrapAcceptor对象的构造中传入在main方法中为childGroup的设置的一些配置,如自定义hander<br> 在有客户端请求服务器连接时会触发服务器accept事件,最终会执行服务器DefaultChannelPipeline链中inbound类型hander的channelRead方法<br> 就会执行到ServerBootstrapAcceptor#channelRead方法,该方法中会通过childGroup中的线程去执行NioSocketChannel的注册操作<br> 在注册完成后会在HeadContext#channelActive中设置对read感兴趣(BossGroup是在doBind中调用channelActive的,而此处是在注册完后直接调用的,因为此时端口已绑定,isActive()方法返回true,具体查看#9.2.1.3.1.1.1.6.4.6)<br>
2. 更新handlerState状态为ADD_COMPLETE<br> <font color="#0076b3"> ctx.setAddComplete();</font>
如果当前线程不是executor中的那个单线程(此处不满足)
3. 将执行callHandlerAdded0方法作为任务添加到eventLoop中进行执行<br> 说白了就是只能使用eventLoop中的线程执行任务callHandlerAdded0<br> <font color="#0076b3">callHandlerAdded0(ctx);</font><br>
4. 该方法中主要是执行promise.trySuccess()操作<br> <font color="#0076b3"> safeSetSuccess(promise);</font><br>
1. cas设置DefaultPromise#result属性值为DefaultPromise#SUCCESS<br> 如果成功后DefaultPromise#waiter>0还会执行<font color="#0076b3">notifyAll();</font>唤醒阻塞在该DefaultPromise对象上的锁<br> <font color="#0076b3">setSuccess0(result)</font><br>
2. 通知监听器执行回调方法<br> <font color="#0076b3"> notifyListeners();</font><br>
5. 从头结点HeadContext开始,调用DefaultChannelPipeline链中inbound类型handler的channelRegistered方法<br> <font color="#0076b3">pipeline.fireChannelRegistered();</font><br>
6. 通过isActive()方法判断判断服务器端口是不是绑定了,在客户端连接服务器端触发ServerBootstrapAcceptor注册NioSocketChannel时服务器端口已经绑定好了,这个时候满足条件,会执行pipeline.fireChannelActive(),fireChannelActive()中最终会设置对读感兴趣事件<br>
5. 如果eventLoop#state值>=4,则移除该task并执行拒绝策略<br> <font color="#0076b3"> if (isShutdown() && removeTask(task)) {<br> reject();<br> }</font><br>
6. 如果同时满足右侧添加执行selector.wakeup();(此处满足这些条件)<br>
addTaskWakesUp值为false<br>
task不是NonWakeupRunnable类型
inEventLoop为false<br>
原子类属性wakenUp成功从false修改为true
2. 最终返回promise<br> <font color="#0076b3"> return promise;</font>
4. <font color="#0076b3">return regFuture</font>
2. 从./1返回的reFuture中取出初始化好的NioServerSocketChannel<br> <font color="#0076b3"> final Channel channel = regFuture.channel();</font><br>
3. 判断./1 中初始化注册的过程中是否有发生异常,有则直接返回regFuture
如果regFuture.isDone()为true<br> 如果为true说明ServerSocketChannel已经初始化注册完成则立即执行doBind0操作<br> isDone通过判断DefaultPromise#result属性值判断Future是否完成<br> 在./1.3.1.1.1.6.4.4.1会将该值修改为SUCCESS完成且成功<br>
4. 通过channel创建一个新的DefaultChannelPromise对象<br> 因为后续的bind操作依据是在那个EventLoop线程中以异步任务的形式进行执行<br> 所以通过这个新建的promise在主线程中判断执行情况<br> <font color="#0076b3"> ChannelPromise promise = channel.newPromise();</font><br>
5. 在doBind0方法中主要是提交了一个执行绑定操作的异步任务<br> <font color="#0076b3"> doBind0(regFuture, channel, localAddress, promise);</font><br>
1. 该方法调用DefaultChannelPipeline#bind,DefaultChannelPipeline#bind方法会从TailContext开始往前面查找<br> outbound类型的handler,调用bind方法,最终会执行HeadContext#bind(和./1.3.1.1.1.6.4.5有些类似)<br> 此处添加了一个CLOSE_ON_FAILURE监听器用来在promise执行失败的情况下关闭ServerSocketChannel<br> <font color="#0076b3"> channel.bind(localAddress, promise)<br> .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);</font><br>
1. 通过NioMessageUnsafe#bind执行绑定操作<br> <font color="#0076b3"> unsafe.bind(localAddress, promise);</font><br>
1. 对ServerSocketChannel执行绑定操作<br> <font color="#0076b3">doBind(localAddress);</font><br>
2. 此处又提交了一个异步任务,该任务中会调用DefaultChannelPipeline#fireChannelActive方法<br> <font color="#0076b3"> </font>该方法中会调用HeadContext#channelActive方法<font color="#0076b3"><br> public void run() {<br> pipeline.fireChannelActive();<br> }</font><br>
1. 从HeadContext开始调用DefaultChannelPipeline链表中inbound类型hander的channelActive方法<br> <font color="#0076b3"> ctx.fireChannelActive();</font><br>
2. 该方法中会从TailContext开始调用DefaultChannelPipeline链表中outbound类型hander的read方法<br> 最后中会调用到HeadContext#read方法<br> <font color="#0076b3">readIfIsAutoRead();</font><br>
1. 调用AbstractUnsafe#beginRead方法,该方法中将会执行doBeginRead方法,主要是将<br> ServerSocketChannel的selectionKey值修改为16(在#9.2.1.3.1.1.1.6.4.1注册的值为0),表示对OP_ACCEPT事件感兴趣<br><font color="#0076b3"> unsafe.beginRead();</font><br>
3. 设置DefaultPromise#result为SUCCESS表示执行成功<br> <font color="#0076b3"> safeSetSuccess(promise);</font><br>
6. 返回新创建的promise,通过<br> <font color="#0076b3"> return promise;</font><br>
如果regFuture.isDone()为false<br> 如果为false则往regFuture注册一个监听器<br> 在./1.3.1.1.1.6.4.4.2会回调该监听器执行doBind0操作,监听器中执行的操作基本上和上面类似
客户端启动<br>
5. 客户端启动其他步骤和服务器端启动大同小异,此处只分析connetct<br> 下面的方法最终会调用doResolveAndConnect<br> <font color="#0076b3">ChannelFuture f = b.connect("localhost", 9999).sync();</font><br>
1. 和服务器端#9.2.1逻辑相似<br> 只不过这里是初始化NioSocketChannel并注册SocketChannel到选择器中<br> <font color="#0076b3">final ChannelFuture regFuture = initAndRegister();</font><br>
2. 取出channle<br> <font color="#0076b3"> final Channel channel = regFuture.channel();</font>
3. 和服务器端一样,future中的初始化注册操作已经完成<br> 则直接执行接下面的操作,否则添加监听器在监听器中执行<br> <font color="#0076b3">return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());</font><br>
1. 获取在iniAndRegister中为channe中分配的那个eventLoop<br> <font color="#0076b3"> final EventLoop eventLoop = channel.eventLoop();</font><br>
2. 通过上面的eventLoop创建一个InetSocketAddressResolver对象,并缓存到一个Map对象resolvers中<br> key为这个eventLoop<br> <font color="#0076b3"> AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);</font><br>
3. 通过上面的resolver判断remoteAddress是不是已经resolve过了,没有的话先resolve<br> 这个resolve的过程是异步的,在这个异步操作执行后才会调用doConnect<br> <font color="#0076b3">doConnect(remoteAddress, localAddress, promise);</font><br>
1. 从connectPromise中取出channel,这个promise是#5.3中通过channel创建的,里面有保存channel的引用<br> <font color="#0076b3">final Channel channel = connectPromise.channel();</font>
2. 调用AbstractChannel#connect方法,最终会调用TailContext#connect<br> <font color="#0076b3"> channel.connect(remoteAddress, localAddress, connectPromise);</font><br>
1. TailContext#connect中会从TailContext开始往前面查找<br> outbound类型的handler,最终会调用HeadContext#connect<br> HeadContext#connect中会调用AbstractNioChannel#connect<br> <font color="#0076b3">return tail.connect(remoteAddress, promise);</font><br>
1. 调用doConnect方法,该方法中会通过SocketChannel发送connect请求<br> 如果connect的结果为false(由于是非阻塞IO,一般会立即返回false)<br> 则修改selectionKey#interestOps为8(之前是0),表示等待SelectionKey.OP_CONNECT事件就绪<br> 在三次握手后会触发OP_CONNECT事件就绪,与此同时服务器端OP_ACCEPT事件也会就绪(#9.2.1.3.1.1.1.6.3.1.3.4.1)<br> <font color="#0076b3">doConnect(remoteAddress, localAddress)</font><br>
如果上面的connect返回true<br>
2. 立即执行下面的方法<br> (即使此处connect没有立即返回true,在超时时间内监听到connect事件后也会调用该方法)<br> <font color="#0076b3">fulfillConnectPromise(promise, wasActive)</font><br>
1. 设置promise为Success,然后执行注册在promise上的监听器<br> <font color="#0076b3">boolean promiseSet = promise.trySuccess();</font><br>
2. 和服务器端#9.2.5.1.1逻辑类似,只不过这里是监听read事件,而服务器端监听的是accept<br> <font color="#0076b3"> pipeline().fireChannelActive();</font><br>
1. 执行执行DefaultChannelPipeline链表中的inbound类型handler的channelActive方法<br> <font color="#0076b3">ctx.fireChannelActive();</font><br>
2. 调用DefaultChannelPipeline链表中outbound类型hander的read方法<br> 最终会调用到HeadContext#read设置selectionKey#interestOps为1,监听读事件<br> <font color="#0076b3"> readIfIsAutoRead();</font><br>
如果没有connect返回false
3. 添加一个延时任务进行处理超时的情况(在超时时间内监听到connect事件后会关闭该超时任务,具体可查看AbstractNioChannel#finishConnect的finally代码)<br> 通过还添加了一个监听器用来在promise取消的时候关闭这个延时任务<br>
3. 在connectPromise中添加一个监听器在connect失败后关闭NioSocketChannel<br> <font color="#0076b3">connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);</font><br>
NioSocketChannel读操作分析
1. 取出NioSocketChannelConfig<br> <font color="#0076b3"> final ChannelConfig config = config();</font><br>
2. 取出NioSocketChannel中的DefaultChannelPipeline<br> <font color="#0076b3"> final ChannelPipeline pipeline = pipeline();</font><br>
3. 从NioSocketChannelConfig中取出allocator属性,该属性值为<font color="#0076b3">ByteBufAllocator.DEFAULT</font>而<font color="#0076b3">ByteBufAllocator.DEFAULT</font>的值为<font color="#0076b3">ByteBufUtil.DEFAULT_ALLOCATOR<br></font> 在没有设置系统属性“io.netty.allocator.type”的情况下只要不是Android环境allocator值为PooledByteBufAllocator对象<br> <font color="#0076b3"> final ByteBufAllocator allocator = config.getAllocator();</font><br>
4. recvBufAllocHandle()方法最终获取的对象为AdaptiveRecvByteBufAllocator#HandleImpl<br> 该对象主要是用来负责自适应调整当前缓存分配的大小,以防止缓存分配过多或过少<br> <font color="#0076b3">final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();</font><br>
在AdaptiveRecvByteBufAllocator中保存着一个长度53的整型数组SIZE_TABLE<br>里面的值为从16起下一个元素加16一直到496,然后从512起,以一个元素*2一直到整型溢出供53个元素<br>在AdaptiveRecvByteBufAllocator中还保存着DEFAULT_MINIMUM,DEFAULT_INITIAL,DEFAULT_MAXIMUM三个属性值分别为64,1024,65536<br>HandleImpl在实例化的时候会为属性minIndex,maxIndex,index,nextReceiveBufferSize赋值,前三个取值就是64,1024,65536在SIZE_TABLE的下标位置<br>nextReceiveBufferSize取值就是SIZE_TABLE[index]即1024,这个值将作为#7中申请内存时的初始内存<br>
5. 为allocHandle设置 maxMessagePerRead,totalMessages,totalBytesRead三个属性的值,这三个值是在HandleImpl的父类MaxMessageHandle中定义的<br> 其中maxMessagesPerRead的值是在NioSocketChannelConfig父类DefaultChannelConfig的构造中创建AdaptiveRecvByteBufAllocator对象的时候设置进去的<br> <font color="#0076b3">allocHandle.reset(config);</font><br>
6. <font color="#0076b3">ByteBuf byteBuf = null;</font><br>
do while循环读取Socket缓冲区的数据<br>
7. 调用allocHandle#allocHandle方,该方法中通过allocator对象申请内存空间<br> 初始内存空间大小为nextReceiveBufferSize的值(一开始默认为1024)<br> <font color="#0076b3">byteBuf = allocHandle.allocate(allocator);</font><br>
8. doReadBytes(byteBuf)就是从SocketChannel中读取数据到byteBuf,并返回读取的字节个数<br> 然后将读取的字节个数作为入参调用allocHandle#lastBytesRead,lastBytesRead方法中会设置lastBytesRead为读取字节个数<br> 同时累加到totalBytesRead变量中,如果该变量溢出了,则重置为Integer.MAX_VALUE<br> <font color="#0076b3"> allocHandle.lastBytesRead(doReadBytes(byteBuf));</font><br>
9. 如果lastBytesRead值小于等于0,则执行完右侧操作后跳出循环<br> <font color="#0076b3"> allocHandle.incMessagesRead(1);</font><br>
byteBuf#release释放申请的内存
设置byteBuf为null帮助GC回收
如果lastBytesRead值小于0则标识close变量为true<br>说明请求方可能关闭了,在#14中进行进一步处理<br>
10. 累加totalMessages变量<br> <font color="#0076b3"> allocHandle.incMessagesRead(1);</font><br>
11. 调用DefaultChannelPipeline链中的handler的channelRead方法<br> <font color="#0076b3">pipeline.fireChannelRead(byteBuf);</font><br>
12. 这一步主要是重新调整内存分配策略,比如说上面的while循环超过了 maxMessagePerRead然后跳出循环了<br> 那么此处就会将HandleImpl中的index将会后移(这样nextReceiveBufferSize值也就变大了)<br> 那么下一次触发read事件重新进入while读取剩下的那些数据时,#7处申请的内存大小就会调整成一个比1024更大的值<br> <font color="#0076b3">allocHandle.readComplete();</font><br>
13. 调用DefaultChannelPipeline链中的handler的channelReadComplete方法<br> 当请求的传输的数据比较大时上面的while循环可能会执行多次,也就是说#11处的channelRead方法可能会被多次调用<br> 这个时候可以通过在channelRead方法中将请求数据进行缓存,在channelReadComplete方法再进行处理<br> 另一种比较好的方式就是继承ByteToMessageDecoder重写decode方法<br> 因为在ByteToMessageDecoder#channelRead中子类decode中没有读取掉ByteBuf中的数据的话会被缓存下来<br> <font color="#0076b3">pipeline.fireChannelReadComplete();</font><br>
14. 如果上面#9中读取的数据小于0,会执行closeOnRead方法,该方法中会判断SocketChannel是不是打开的<br> 是打开的且没有设置ChannelOption.ALLOW_HALF_CLOSURE为true就执行关闭操作<br> <font color="#0076b3">if (close) {<br> closeOnRead(pipeline);<br> }</font><br>
NioSocketChannel写操作分析<br>
1. 该方法会调用DefaultChannelPipeline链中上一个outbound类型的hander的write方法<br> 最终会执行到HeadContext#write方法,该方法中最终调用的AbstractUnsafe#write<br><span style="font-size: inherit;"> <font color="#0076b3"> ChannelHandlerContext#write(msg);</font></span><br>
1. 取出AbstractUnsafe#outboundBuffer属性,该属性是在创建NioSocketChannel时<br> 实例化NioSocketChannelUnsafe对象时创建的,值为ChannelOutboundBuffer<br> ChannelOutboundBuffer是一个单向链表,链表中的节点是ChannelOutboundBuffer#Entry对象<br><font color="#0076b3"> </font> <font color="#0076b3">ChannelOutboundBuffer outboundBuffer = this.outboundBuffer</font><br>
2. 如果传入的msg不是ByteBuf或FileRegion类型会抛出UnsupportedOperationException异常<br> 如果是ByteBuf类型不是非堆内存会转化成非堆内存<br> <font color="#0076b3"> msg = filterOutboundMessage(msg);</font><br>
3. 计算msg的字节数赋值给size属性,如果赋值后的size小于0则修改为0<br> <font color="#0076b3">size = pipeline.estimatorHandle().size(msg);</font><br>
4. msg添加到outboundBuffer单向链表中<br> <font color="#0076b3"> outboundBuffer.addMessage(msg, size, promise);</font><br>
1. 将msg, size, promise构建成Entry,这个newInstance中使用了Recycler类,使用到了对象池复用技术<br> <font color="#0076b3">Entry entry = Entry.newInstance(msg, size, total(msg), promise);</font><br>
2. 将entry添加到链表的尾节点
3. ChannelOutboundBuffer中有个AtomicLongFieldUpdater类型的属性totalPendingSize用来存储写入到ChannelOutboundBuffer中的字节总数<br> 如果总字节数超过默认的64KB,则设置通过CAS修改unwritable标志位的值为1(表示通道不可写了默认为0),CAS修改成功后会执行<br> DefaultChannelPipeline链中的channelWritabilityChanged方法,所以可以在channelWritabilityChanged方法中通过ctx.channel().isWritable()<br> 获取unwritable的值控制数据的写入<br> <font color="#0076b3">incrementPendingOutboundBytes(size, false);</font><br>
2. ./1中只是将msg写入到单向链表对象ChannelOutboundBuffer中,这一步才是写入到Socket发送出去<br> 和./1一样,最终会进入AbstractUnsafe类,只不过这里是调用AbstractUnsafe#flush方法<br> <font color="#0076b3">ChannelHandlerContext#flush();</font><br>
1. <font color="#0076b3">ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;</font><br>
2. 首先拿到outboundBuffer链表中未刷新的头结点unflushedEntry 赋值给 flushedEntry<br> 然后循环这些节点尝试设置节点的promise#result属性为UNCANCELLABLE(表示不能执行promise取消了) <br> 如果设置失败说明已经调用promise取消方法了,这时需要将节点取消掉,并将outboundBuffer中totalPendingSize值减去该节点的字节数<br> 如果减去后的totalPendingSize值小于默认最小总字节数,也会触发channelWritabilityChanged方法调用<br> <font color="#0076b3"> outboundBuffer.addFlush();</font><br>
3. 执行刷新操作<br> <font color="#0076b3">flush0();</font><br>
1. 如果已经注册了OP_WRITE事件,就直接返回了(一般不会自己去注册OP_WRITE事件,Socket绝大部分情况下是可以写的,因此注册此事件的作用就不大了)<br> <font color="#0076b3"> if (isFlushPending()) {<br> return;<br> }</font><br>
2. 调用父类AbstractUnsafe的flush0方法,该方法中会将outboundBuffer中缓存的数据通过通过socket传输给对端<br> <font color="#0076b3">super.flush0();</font><br>
0 条评论
下一页