Netty启动源码
2021-09-29 15:04:22 0 举报
Netty启动源码流程
作者其他创作
大纲/内容
pipeline.remove(this);
使用NioEventLoopGroup里的线程池对象异步执行executor.execute(() ->SingleThreadEventExecutor.this.run());
socketChannel pipeline
将register任务加入taskQueue等待异步执行
TailContext
ChannelInitializer
初始化ServerChannel的pipeline
runAllTasks() -> runAllTasksFrom(taskQueue)循环取出taskQueue队列中的任务执行其run方法
ServerBootstrapAcceptor#ChannelRead
config().group().register(channel)
childGroup.register(child) ;将客户端连接SocketChannel以轮询的方式注册到workGroup里的某一个NioEventLoop中的Selector上
startThread();
注册感兴趣的事件readInterestOp
HeadContext
initAndRegister
pipeline = newChannelPipeline();
注册interestOps
channel = channelFactory.newChannel();
ServerBootstrap#bind -> doBind
final int interestOps = selectionKey.interestOps();selectionKey.interestOps(interestOps | readInterestOp);
通过无参构造方法实例化通过ServerBootstrap.channel(ChannelClass) 链式调用传进来的ChannelClass对象
initChannel(ch)
processSelectedKeys() -> 遍历感知到的IO事件并逐个处理 processSelectedKey
SocketChannel注册逻辑跟ServerSocketChannel注册是同一套代码
pipeline.fireChannelRead(byteBuf); 回调客户端SocketChannel对应的pipeline里面所有handler的channelRead方法
初始化SocketChannel的pipeline
new NioEventLoopGroup(nThreads) -> 父类MultithreadEventLoopGroup的构造方法
ServerBootstrapAcceptor
taskQueue = newTaskQueue(this.maxPendingTasks);
serverSocketChannel pipeline
AbstractNioChannel#doBeginRead
new NioSocketChannel()
MultithreadEventLoopGroup#next().register(channel) ->SingleThreadEventLoop#register(channel)
SingleThreadEventExecutor#execute(Runnable task)
当发生OP_ACCEPT事件:NioMessageUnsafe.read()
this.readInterestOp = readInterestOp;ch.configureBlocking(false); 非阻塞模式
NioEventLoop.run() 死循环执行:
ChannelInitializer#initChannel中添加的自定义handler
将channel和当前NioEventLoop封装成promise对象
AbstractUnsafe#register0(promise)
遍历readBuf中的所有客户端连接SocketChannel使用服务端连接ServerSocketChannel的管道执行:pipeline.fireChannelRead(readBuf.get(i)); 这里其实就会触发ServerBootstrapAcceptor的ChannelRead方法
executor = new ThreadPerTaskExecutor(...)
pipeline.fireChannelReadComplete(); 回调客户端SocketChannel对应的pipeline里面所有handler的channelReadComplete方法
为了解决select方法无效返回进而导致CPU100%的问题,Netty记录了无效唤醒的次数(默认512),一旦无效返会次数超过阈值,就会执行 selectRebuildSelector 方法新创建一个Selector替换掉当前有问题的Selector,并将原来的Seletor上注册的连接和事件转移到新的上面
将socketChannel包装为NioSocketChannel
NioServerSocketChannel#doReadMessages(readBuf),获取客户端连接并封装成NioSocketChannel,添加到readBuf
经过一系列复杂的调用链
handlerAdded(ctx)
child.pipeline().addLast(childHandler);将服务端启动时通过.childHandler(new ChannelInitializer<SocketChannel>() {....})方法添加的ChannelInitializer注册到客户端通道对应的管道中
openSelector();
数组元素类型是NioEventLoop
addTask(task) -> taskQueue.offer(task)
doBind0
SocketUtils.accept(javaChannel())
select(oldWakenUp) 又在一个死循环中执行:
super(parent);
new NioServerSocketChannel()
pipeline.fireChannelRegistered();
byteBuf = allocHandle.allocate(allocator); 默认分配堆外内存缓冲区
当注册逻辑执行到pipeline.invokeHandlerAddedIfNeeded();时会调用socketChannel pipeline里的ChannelInitializer的initChannel方法把该方法中添加的的handler全部加入客户端通道的pipeline,最终客户端通道的pipeline是这样
获取一个NIO的Selector实例
pipeline.fireChannelActive();
当发生 OP_READ事件:NioByteUnsafe.read()
当channel注册时调用
实例化一个LinkedBlockingQueue 类型的taskQueue
super(parent); -> AbstractChannel
safeSetSuccess(promise);
initChannel(ctx)
init(channel)
pipeline.invokeHandlerAddedIfNeeded(); 调pipeline里面每个handler的handlerAdded方法
channel.pipeline().addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) { ch.eventLoop().execute(() -> pipeline.addLast(new ServerBootstrapAcceptor(...)); }}),
NioSocketChannel#doReadBytes(byteBuf) 读取数据到缓冲区
ServerBootstrap#init
0 条评论
下一页