Netty源码分析
2022-10-20 11:48:22 44 举报
AI智能生成
Netty是一个高性能、异步事件驱动的网络应用框架,用于快速开发可维护的高性能协议服务器和客户端。它提供了一套丰富的API,支持多种传输协议,如TCP、UDP和HTTP等。Netty的核心组件包括Channel、EventLoop、ChannelHandler等,通过这些组件可以构建复杂的网络应用程序。Netty的源码结构清晰,易于阅读和理解,是学习和研究网络编程的优秀实践。
作者其他创作
大纲/内容
服务端启动<br>
创建ServerSocketChannel<br>
new NioServerSocketChannel()<br>
newSocket(PROVIDER) -> ServerSocketChannel<br>
this.readInterestOp = SelectionKey.OP_ACCEPT 保存关心的事件为Accept<br>
初始化ServerSocketChannel<br>
添加之后的结构<br>
注册selector<br>
AbstractChannel.this.eventLoop = eventLoop; 将分配到的EventLoop保存至chanel内部<br>
<span style="font-size: inherit;">将Netty-Channel内部的jdk-Channel注册至该EventLoop绑定的selector上,<span class="equation-text" contenteditable="false" data-index="0" data-equation="此时"><span></span><span></span></span>并不关心事件selectionKey = javaChannel().register(eventLoop().selector, 0, this);</span><br>
获取注册后的selectionKey作为Netty-Channel的成员变量
服务端口绑定<br>
dk-Channel绑定端口<br>javaChannel().bind(localAddress, config.getBacklog());<br>
绑定完毕,触发channelActive事件HeadContext ctx.fireChannelActive();
触发完毕,调用HeadContext.readIfIsAutoRead();
传播至HeadContext.read()-> unsafe.beginRead();-> doBeginRead() 修改SelectionKey感兴趣的事件为创建时保存的兴趣事件
创建时保存感兴趣事件,创建jdk-Channel并创建id, unsafe, pipeline<br>服务端初始化主要用于在pipeline添加ServerBootstrapAcceptor处理器<br>用于将监听到的客户端Channel注册至客户端Group中<br>
注册主要目的用于将register() -> Netty-Channel注册至EventLoop (表现行为为保存EventLoop引用)register0() -> jdk-Channel注册至selector上 (通过jdk的方式注册)
全部由HeadContext节点处理
NioEventLoop<br>
创建
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory())<br>
factory -> new FastThreadLocalThread(threadGroup, r, name)
execute -> threadFactory.newThread(command).start()
this.executor = ObjectUtil.checkNotNull(executor, "executor");<br>
taskQueue = newTaskQueue(this.maxPendingTasks);
selector = openSelector();
chooser = chooserFactory.newChooser(children);<br>
EventExecutorChooser.next()轮询选择EventLoop
启动
SingleThreadEventExecutor.execute(task)<br>
this.thread为null -> 非nio线程 -> doStartThread()
executor.execute()创建FastThreadLocalThread
thread = Thread.currentThread();<br>保存由executor分配的线程(FastThreadLocalThread)
SingleThreadEventExecutor.this.run(),在该线程中正式启动EventLoop<br>
addTask(task) -> taskQueue.offer(task) 添加此任务由eventloop执行<br>
执行逻辑<br>
检测IO事件和任务队列<br>
获取最新要执行的定时任务的deadline作为这次select的deadline<br>
hasTasks()为穿插任务,当出现穿插任务时,跳出这次select
selector.select(timeoutMillis) 进行阻塞式select
当检测到未实际阻塞并且超过selectCnt阈值512
触发空轮训bug -> rebuildSelector();
newSelector = openSelector();<br>
Register all channels to the new Selector.<br>
处理IO事件<br>
在默认优化的前提下,<br>SelectorImpl的selectedKeys和publicSelectedKeys已被反射替换为new SelectedSelectionKeySet(),<br>该实现通过数组的方式优化了HashSet
processSelectedKeysOptimized(selectedKeys.flip())<br>
processSelectedKey(SelectionKey k, AbstractNioChannel ch)<br>
任务执行<br>
循环获取scheduledTask.deadlineNanos() <= nanoTime的定时任务<br>即将需要执行的定时任务合并至taskQueue<br>
在deadline(ioRatio默认1:1)之内时循环执行taskQueue中的task
2 * cpu<br>AbstractBootstrap.doBind0() -> channel.eventLoop().execute -> channel.bind<br>当execute调用时,当分配到的EventLoop尚未启动时触发线程的启动执行<br>在绑定结束后,会触发pipeline的channelActive事件
实际阻塞的select操作未发生阻塞并超过阈值则重新构建selector<br>
当外部线程调用eventLoop或channel方法时<br>将外部线程执行的任务封装成task丢至EventLoop顺序执行
新连接接入<br>
新连接检测<br>
processSelectedKey(SelectionKey k, AbstractNioChannel ch)<br>
NioServerSocketChannel.doReadMessages()<br>调用accept方法获取jdk SocketChanne
buf.add(new NioSocketChannel(this, ch)) 包装成netty channel<br>
pipeline.fireChannelRead(readBuf.get(i))<br>allocHandle用来控制accept速率
NioSocketChannel的创建<br>
this.readInterestOp = SelectionKey.OP_READ 保存关心的事件为read<br>
ch.configureBlocking(false);<br>
javaSocket.setTcpNoDelay(true);<br>
Netty中Channel的分类<br>
Channel 层级关系
ChannelConfig 层级关系<br>
新连接分配NioEventLoop并注册Selector<br>
服务端channel初始化时触发Acceptor的添加<br>ServerBootstrap.init(Channel channel) -> pipeline.addLast(new ServerBootstrapAcceptor())<br>
检测到新连接时触发NioMessageUnsafe.read() -> pipeline.fireChannelRead(readBuf.get(i));<br>
触发ServerBootstrapAcceptor.channelRead(ChannelHandlerContext ctx, Object msg)<br>
Acceptor的channelRead逻辑<br>
child.pipeline().addLast(childHandler);<br>
对新连接设置options和attrs<br>
childGroup.register(channel)<br>新连接注册至workerGroup
next().register(channel)<br>通过chooser选择一个NioEventLoop进行注册
AbstractUnsafe.register(EventLoop eventLoop, final ChannelPromise promise)<br>doRegister() -> selectionKey = javaChannel().register(eventLoop().selector, 0, this)<br>此时注册至selector但并不关心事件
总结:在boss中的NioEventLoop检测到新连接,注册至worker中的NioEventLoop<br>
NioSocketChannel读事件注册<br>
入口:pipeline.fireChannelActive();<br>
HeadContext.channelActive(ChannelHandlerContext ctx)
HeadContext.readIfIsAutoRead()
HeadContext.unsafe.beginRead();
AbstractNioChannel.doBeginRead()<br>selectionKey.interestOps(this.readInterestOp)<br>传播结束回到头结点,此时实际设置感兴趣的事件
select检测到新连接<br>
processSelectedKey 处理jdk-客户端channel<br>此时创建Netty-channel保存感兴趣的事件
触发服务端读事件至ServerBootstrapAcceptor.channelRead<br>调用childGroup.register(child),通过choose分配EventLoop<br>Netty-Channel保存分配的EventLoop并将Channel保存的jdk-Channel注册至selector
注册完毕触发ChannelActive事件<br>由HeadContext触发beginRead,此时将开始监听实际感兴趣的事件
eventloop -> processSelectedKeys();<br>
NioMessageUnsafe.read()<br>-> doReadMessages(readBuf)<br>-> javaChannel().accept()<br>-> new NioSocketChannel(this, ch)<br>-> pipeline.fireChannelRead(readBuf.get(i));<br>-> ServerBootstrapAcceptor.channelRead()<br>-> childGroup.register(channel)<br>-> next().register(channel)
注册的两个部分<br>
register() 保存选择出来的eventLoop<br>
register0() jdk-channel实际注册,未监听任何事件
beginRead() 监听感兴趣的事件
ChannelPipeline<br>
pipeline初始化<br>
new AbstractChannel(Channel parent)<br>-> pipeline = newChannelPipeline()<br>-> new DefaultChannelPipeline(Channel channel)
this.channel = ObjectUtil.checkNotNull(channel, "channel");<br>tail = new TailContext(this);<br>head = new HeadContext(this);<br>head.next = tail;<br>tail.prev = head;
pipeline默认结构<br>
添加ChannelHandler<br>
ChannelPipeline.addLast(ChannelHandler... handlers)<br>
判断是否重复添加<br>checkMultiplicity(handler);
创建节点<br>newCtx = newContext(group, filterName(name, handler), handler);
添加至链表<br>addLast0(newCtx);
回调添加完成事件<br>callHandlerAdded0(newCtx);<br>->ctx.handler().handlerAdded(ctx);<br>ctx.setAddComplete();
删除ChannelHandler<br>
ChannelHandler分类<br>
inBound事件的传播<br>
顺序传播<br>
outBound事件的传播<br>
逆序传播<br>
异常的传播<br>
当前节点顺序传播<br>
问题
根据class类型<br>
根据传播规律添加<br>
ctx当前节点<br>pipeline头尾传播
ByteBuf
ByteBuf结构及重要API<br>
ByteBuf分类
ByteBufAllocator内存分配器<br>
UnpooledByteBufAllocator<br>
heap通过创建数组[]分配内存<br>
direct通过调用jdk-nio创建直接内存ByteBuffer
PooledByteBufAllocator<br>
结构
PoolArena<br> - DirectArena<br> - HeapArena<br>Thread通过PoolThreadLocalCache创建的PoolThreadCache与某个Arean绑定<br>PooledByteBufAllocator每次创建时同时创建2种Arean
内存规格<br>
MemoryRegionCache<br>
每个节点为该种规格的RegionCache,通过内部的queue来存储这种规格的内存<br>
PoolThreadCache结构<br>
Thread - PoolThreadCache - Allocator 三者关系<br>
Arenas默认大小为2 * cpu核心数<br>Arena用于开辟一块连续内存
PoolThreadCache - MemoryRegionCache 关系 memCache用于缓存一块连续内存<br>
PoolArena结构<br>
Chunk结构以及Page切分<br>
缓存的分配流程<br>
PooledByteBufAllocator.newDirectBuffer(int, int)<br>-> directArena.allocate<br>-> newByteBuf(maxCapacity); 从Recycler中获取一个纯净对象<br>-> allocate(cache, buf, reqCapacity); 给这个纯净对象分配内存
PoolThreadCache.allocateNormal(PoolArena<?>, PooledByteBuf<?>, int, int)<br>首先在cache上进行内存分配
PoolArena.allocateNormal(PooledByteBuf<T>, int, int)<br>cache无法内存分配时,由arena分配
命中缓存的分配流程<br>
计算缓存节点,tiny通过除以16得出节点下标<br>拿到MemoryRegionCache
MemoryRegionCache.queue.poll(); 弹出一个Entry<br>initBuf(); 将弹出的entry所代表的内存分配给ByteBuf<br>-> buf.init() 完成初始化
entry.recycle(); 将弹出的entry丢回对象池(默认只回收1/8)<br>
未命中缓存的分配流程<br>
page级别内存分配<br>PoolArena.allocateNormal(PooledByteBuf<T>, int, int)
PoolChunkList.allocate(PooledByteBuf<T>, int, int)<br>第一次,此时List内部为空,即还没有chunk
Chunk通过一个平衡二叉树来保存内存分配情况<br>
PoolChunk.allocateRun(int)<br>-> int d = maxOrder - (log2(normCapacity) - pageShifts); 计算需要在第几层分配<br>-> int id = allocateNode(d); id即表示树中的第几个节点,作为handle返回<br>-> updateParentsAlloc(id); 标记父节点内存被使用<br><br>PoolChunk.initBuf(PooledByteBuf<T>, long, int)<br>-> PooledByteBuf.init(PoolChunk<T>, long, int, int, int, PoolThreadCache)<br>分配完毕,保存chunk和handle即可指向一块内存
subpage级别内存分配<br>
ByteBuf的回收<br>
拿到MemoryRegionCache节点,添加至队列<br>
当缓存队列满后加入失败,则标记分配到的连续内存为未使用
通过recycle()回收至Recycler
总结
heap/direct safe/unsafe pooled/unpooled<br>
Allocator持有Arena数组,Arena用于分配内存<br>通过PoolThreadCache将线程与Arena绑定,默认一个Nio线程持管理一个Arena
huge - 直接分配<br>normal - page<br>small/tiny - subpage
Netty解码<br>
ByteToMessageDecoder
通过cumulation累加字节<br>
decodeRemovalReentryProtection(ctx, in, out);<br>调用子类的docode方法进行解析
未解析数据则跳出循环<br>解析到数据至out时则循环传播解析到的list后clear
FixedLengthFrameDecoder<br>
直到可读字节数达到一帧则读取ByteBuf至out<br>
LineBasedFrameDecoder
以\r\n或\n作为分隔符读取一帧
当发现已经超出所设的最大长度时,则丢弃下一个分隔符前的所有字节<br>
DelimiterBasedFrameDecoder
构建时当发现分割符为LineBase则初始化LineBasedFrameDecoder<br>
逻辑同LineBasedFrameDecoder<br>当有多个分隔符时,每次取最小的帧,即以最近的分隔符为截止点
LengthFieldBasedFrameDecoder
frameLength += lengthAdjustment + lengthFieldEndOffset;<br>当frameLength大于最大帧限制时,直到将该帧字节全部丢弃完毕才会退出丢弃模式,逻辑同其余解码器
总结
通过一定规则累积ByteBuf,当满足一帧时向后传播<br>
如上
Netty编码<br>
writeAndFlush()<br>
通过pipeline调用时从tail节点传播,否则从当前节点传播,见pipeline传播机制<br>
acceptOutboundMessage(msg)<br>I cast = (I) msg;<br>匹配对象
buf = allocateBuffer(ctx, cast, preferDirect);<br>分配内存
encode(ctx, cast, buf);<br>编码实现,由子类实现
ReferenceCountUtil.release(cast);<br>释放对象
ctx.write(buf, promise);<br>传播数据
buf.release();<br>释放内存
HeadContext.write(ctx, msg, promise)<br>-> unsafe.write(msg, promise);
msg = filterOutboundMessage(msg);<br>检测msg类型是否支持,将buf变为directBuf
outboundBuffer.addMessage(msg, size, promise);<br>
添加至buffer<br>
setUnwritable(invokeLater);<br>-> fireChannelWritabilityChanged(invokeLater);<br>当大于64 * 1024时,设置不可写状态
outboundBuffer.addFlush();<br>当总pending字节小于低水位时则设置为可写状态
状态
AbstractNioByteChannel.doWrite(ChannelOutboundBuffer in)<br>
in.current()<br>拿到flushedEntry的msg
ByteBuf buf = (ByteBuf) msg;<br>
in.remove();<br>
当jdk底层无法写入时,之后可能的某个状态<br>
问题
通过编码规则写入ByteBuf,通过ctx或pipeline传递至HeadContext节点<br>
Netty性能优化工具类解析
FastThreadLocal
每次创建都有唯一ID
index = InternalThreadLocalMap.nextVariableIndex();<br>每次调用构造函数都分配唯一的index
get()实现<br>
slowGet() - fastGet()
slowGet() -> ThreadLocal<InternalThreadLocalMap><br>通过jdk线程变量存储该Map
fastGet((FastThreadLocalThread) thread);<br>直接拿到FastThread内部成员变量Map
每个Thread维护一个数组<br>
Object[] array = new Object[32];<br>Arrays.fill(array, UNSET);<br>默认大小32
每个Thread持有一个InternalThreadLocalMap,为一个数组<br>
每个FastThreadLocal持有一个index,<br>即可在该Thread内的数组中获取该线程变量
不同线程含有不同数组,即ThreadLocal在不同线程之间是隔离的<br>
index为0是variablesToRemoveIndex,故实际有效下标从1开始<br>
Object v = threadLocalMap.indexedVariable(index);<br>直接根据索引号从该线程所持有的数组中获取value
当获取的值为null时调用 -> initialValue();<br>随后将该值设入该线程所持有的数组中
addToVariablesToRemove(threadLocalMap, this);<br>
set()实现<br>
获取map<br>
设置值后调用addToVariablesToRemove<br>
remove时将该index位置设置为UNSET<br>当remove调的值不是UNSET时调用onRemoval(v)<br>removeFromVariablesToRemove同时将0位置的set中的该FastThreadLocal引用移除
Recycler<br>
创建
FastThreadLocal<Stack<T>><br>每个线程持有一个Stack
maxCapacity = 32k<br>ratioMask = 7 即只回收1/8的对象<br>maxDelayedQueues = 2 * cpu<br>avaliable = 32 / 2 = 16k
recycler.get()<br>
获取线程变量Stack<br>
stack.pop()<br>从Stack弹出一个handle
stack.pop()<br>
scavengeSome()
boolean WeakOrderQueue.transfer(Stack<?> dst)<br>每次transfer转移一个Link块内的数据
当handle为空时,创建一个handle<br>并调用newObject()创建一个对象与handle绑定
回收对象<br>handle.recycle(this);<br>stack.push(this);
同线程回收对象<br>pushNow(item);
默认情况只回收1/8的未被回收过的对象<br>
直接放入stack的【DefaultHandle<?>[] elements】中<br>
结构
每一个link包含一个handles,默认大小为16<br>每次分配一个link,即批量分配可回收的handle空槽
绑定关系<br>
每次创建WeakOrderQueue都插入head的头部<br>原始Stack就可以通过单向链表获得外部线程回收的对象
异线程回收对象<br>pushLater(item, currentThread);
FastThreadLocal<Map<Stack<?>, WeakOrderQueue>><br>获取线程变量中存储的WeakOrderQueue
queue = WeakOrderQueue.allocate(this, thread))<br>获取为空时创建一个queue
当Link满时申请空间后创建一个link<br>tail.elements[writeIndex] = handle; 在link中存储该handle<br>handle.stack = null; handle存储于Link,此时已不属于原始Stack
总结
Netty设计模式应用<br>
单例模式<br>
ReadTimeoutException
MqttEncoder<br>
策略模式<br>
DefaultEventExecutorChooserFactory.newChooser(EventExecutor[])<br>
PowerOfTowEventExecutorChooser<br>
GenericEventExecutorChooser
装饰器模式
WrappedByteBuf及其子类
观察者模式<br>
ChannelFuture为被观察者<br>addListener添加监听器即观察者
writeAndFlush()<br>Promise为被观察者<br>Future为观察者
迭代器模式<br>
Bytebuf.foreach<br>
责任链模式<br>
Pipeline<br>
责任处理器接口<br>ChannelHandler
责任链<br>ChannelPipeline
上下文ChannelHandlerContext<br>通过ctx next/prev构成双向链表
责任终止机制<br>netty - fire<br>other - return false
总结<br>
优化<br>
单机调优<br>
应用调优
耗时任务需要单独的线程池
0 条评论
下一页