Netty 源码流程图
2022-11-17 12:21:24   0  举报             
     
         
 Netty 服务端源码流程图
    作者其他创作
 大纲/内容
 ChannelPipeline p = channel.pipeline()
    开始管道注册
  监听端口
  线程组
  head 结点
  ChannelHandler
  这里反射调用的就是 NioServerSocketChannel.class 这个类 的无餐构造方法
  具体调用
  读取数据完成后调用
  设置子线程组 
  分配一个 byteBuf  牵扯到零拷贝
  获取
  原生 NIO方法 等待时间发生 但是传入了等待时间
  next.invokeChannelRegistered()
  ((ChannelInboundHandler) handler()).channelRegistered(this)
  匿名方法 run
  Runnable task = pollTask()
  写(事件)
  ch.configureBlocking(false)
  调用放入 queue的注册任务
  获取 socketChannel
  设置属性
  HeadContext
  加入一个handler   piple管道的样子
  this.childGroup = childGroup
  ch.eventLoop().execute(new Runnable()
  processSelectedKeys()
  加入 childHandler childHandler 是创建启动类 ServerBootstrap 时候加入的 new ChannelInitializer()
  打开 Selector后 赋值给 selector
  核心方法 进行注册
  这里就可以看出 NioEventLoopGroup 实例 里面有个现成数组 数组放的是 NioEventLoop
  selector = selectorTuple.selector
  doBind(localAddress)
  继承
  激活管道注册 传入 handler 头节点
  SelectorProvider.provider()
  void channelInactive(ChannelHandlerContext ctx)
  unsafe.read()
  void channelActive(ChannelHandlerContext ctx)
  Channel注销时调用
  ServerBootstrap bootstrap = new ServerBootstrap()
  pipeline.invokeHandlerAddedIfNeeded()
  原生 NIO
  创建管道头节点
  创建 NioEventLoopGroup
  获取 pipeline  这里获取的 是serversocketcChannelPipeline 因为 还没有调用 doReadMessages方法 socketChanel 还没创建
  设置 NioServerSocketChannel.class 构造方法
  ctx.fireChannelRegistered()
  for循环 
  safeExecute(task)
  children = new EventExecutor[nThreads]
  io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)
  实际调用
  return serverSocketChannel.accept()
  runAllTasks(ioTime * (100 - ioRatio) / ioRatio)
  这里会通过链表遍历 需要调用的 handler
  childGroup 线程组 注册
  return constructor.newInstance()
  child.pipeline().addLast(childHandler)
  逐一调用
  处理事件
  创建任务队列 Queue初始化LinkedBlockingQueue
  SelectStrategy.SELECT
  持有属性
  调用
  获取 selector
  public void initChannel(final Channel ch)
  单线程 事件循环线程
  执行run 方法
  ChannelInboundHandler
  register0(promise)
  创建 NioEventLoop
  返回
  创建 Pipeline管道
  run()
  select 次数自增
  startThread()
  for 死循环
  ChannelHandlerContext
  回调方法
  查找 handler
  new NioEventLoopGroup
  provider.openSelector()
  方法调用 注册
  打开 Selector 等于调用原生 NIO Selector.open()
  TailContext
  selectCnt ++
  调用父类构造方法
  config().group().register(channel)
  next().register(channel)
  业务Handler
  寻找 handler
  return ctx
  强转为 SocketChannel 然后 socketChannel的 pipeline 加入一个handler
  相互指向引用节点
  io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.ChannelPromise)
  获取的是 socketChannel 的 pipeline 因为已经注册了
  创建的线程池也传入到了 NioEventLoop中使用
  把创建的ServerSocektChannel  赋值给属性
  SingleThreadEventExecutor.this.run()
  设置为非阻塞
  SingleThreadEventExecutor
  pipeline.fireChannelRead(byteBuf)
  初始化并且注册
  执行启动线程
  连接(事件)
  ServerBootstrapAcceptor
  channelFactory(new ReflectiveChannelFactory<C>(channelClass))
  ChannelFuture regFuture = initAndRegister()
  创建管道尾结点
  核心就是NIO 原生写入 
  void channelReadComplete(ChannelHandlerContext ctx)
  通过构造方法 创建 NioServerSocketChannel.class 实例
  会根据操作系统 一般linux 调用 new EpollSelectorImpl
  加入
  获取事件 整型值
  handler != null加入到 pipeline 最后
  数据读取完毕后调用
  void channelUnregistered(ChannelHandlerContext ctx)
  打开 Selector
  addTask(task)
  这里有连接事件 读写事件才开始循环
  等待回调方法 进行注册
  绑定端口  并且阻塞同步
  NioEventLoop
  // 多路复用器Selector selector// LinkedBolockingQueue 阻塞队列Queue<Runnable> taskQueu
  调用父类构造
  判断 executor == null
  javaChannel()
  加入的就是这个 匿名方法 从 queue 获取的时候会执行这个方法
  Netty parent 和 child 线程组
  获取 pipeline
  设置属性 ACCETP 感兴趣的事件整型值 但是还没有设置到 selector
  调用方法 创建 pipeline
  调用之后  socketChannel 当前的pipeline
  for (int i = 0; i < selectedKeys.size; ++i)
  启动线程
  进行绑定端口
  判断是否写事件
  设置 parentGroup
  阻塞队列api 加入
  invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED))
  因为创建的是 ServerBootStrap 所以 config 是 ServerBootStrapgroup 就是 设置的线程组
  io.netty.channel.AbstractChannel.AbstractUnsafe#register
  判断传入 nThreads == 0 条件成立会使用默认 DEFAULT_EVENT_LOOP_THREADSNettyRuntime.availableProcessors() * 2 CPU核心数量 * 2 
  构造方法 传入null Executor
  io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
  SelectionKey k = selectedKeys.keys[i]
  io.netty.util.concurrent.SingleThreadEventExecutor#execute
  进行调用
  加入任务
  taskQueue.offer(task)
  Channel
  executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
  for (;;)
  运行全部任务 从阻塞队列 queue
  private final List<Object> readBuf = new ArrayList<Object>()
  激活调用 ChannelRead
  从队列获取任务
  原生NIO  参考代码            // NIO的非阻塞是由操作系统内部实现的,底层调用了linux内核的accept函数            SocketChannel socketChannel = serverSocket.accept();
  SelectorTuple selectorTuple = openSelector()
  true
   select(wakenUp.getAndSet(false))
  NioServerSocketChannel()
  调用handler channelRegistered方法 这里就是开发人员写 handler 重写 channelRegistered方法 会被调用
  实际调用方法
  int readyOps = k.readyOps()
  线程池数组
  void channelRegistered(ChannelHandlerContext ctx)
  regFuture.isDone()
  this.constructor = clazz.getConstructor()
  for(;;)
  addLast0(newCtx)
  selector.select(timeoutMillis)
  doReadMessages(readBuf)
  Executor
  super(parent)
  for (int i = 0; i < size; i ++)
  客户端连接服务端的时候会回调该方法
  AbstractNioChannel
  触发重写 handler 注册的接口 方法
  读(事件)
  这里会触发注册逻辑 真正的客户端发送数据是注册后 才开始有的读写,第一次连接只是先注册pipeline 是serverSocketChannel 的pipeline
  循环列表
  pipeline
  read 方法所属类 NioMessageUnsafe  属性 一个readBuf 集合 
  执行 run方法
  使用NioServerSocketChannel作为服务器的通道实现
  进行绑定
  客户端断开连接调用
  io.netty.channel.nio.AbstractNioChannel#doRegister
  AbstractChannelHandlerContext ctx = this
  执行放到 run里面的 Runnable
  io.netty.channel.DefaultChannelPipeline.HeadContext#channelRegistered
  super.group(parentGroup)
  阻塞队列
  触发重写 handler 读取数据完毕接口方法
  调用这个实例的方法
  Channel child = (Channel) msg
  原生NIO
  ctx = ctx.next
  offerTask(task)
          AbstractChannelHandlerContext prev = tail.prev;        newCtx.prev = prev;        newCtx.next = tail;        prev.next = newCtx;        tail.prev = newCtx;
  获取 socketChannel 
  替换后
  head.next = tail;tail.prev = head;
  执行 select方法 不是完全阻 有超时时间传入塞 
  触发重写 handler 连接接口方法
  processSelectedKeysOptimized()
  线程组执行 Runnable
  执行注册
  io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel)
  传入的 NIO SelectorProvider.provider() 赋值给 provider
  打开 serverSocketChannel 原生 NIO
  执行绑定操作
  参考代码 ServerBootstrap 启动类
  调用完注册后  socketChannel会加入 开发人员业务的handler
  设置 channel工厂 把 NioServerSocketChannel.class 传入
  遍历调用  pipeline 的handler
  private final Queue<Runnable> taskQueue
  交给线程池一个任务
  原生NIO代码 注册 selector 
  重载构造
  for循环
  .channel(NioServerSocketChannel.class)
  doStartThread()
  .childHandler(new ChannelInitializer<SocketChannel>()
  包装为 AbstractChannelHandlerContext
  io.netty.channel.socket.nio.NioSocketChannel#doReadBytes
  ChannelHandler handler = config.handler()
  Nio 原生方法 处理事件key
  fireChannelRead
  ChannelInitializer
  next() 从线程获取一个 EventLoop 进行注册初始化里面有很多 只注册一个
  核心代码 调用业务 handler处理读取的数据 
  allocHandle.lastBytesRead(doReadBytes(byteBuf))
  io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
  allocHandle.allocate(allocator)
  run方法
  创建通道初始化对象,设置初始化参数
  初始化线程池
  bind(new InetSocketAddress(inetPort))
  服务端启动流程 创建对象
  Selector selector = this.selector;
  protected void initChannel(SocketChannel ch)
  next.invokeChannelRead(m)
  serversocketchannel 注册回调
  激活管道注册
  通过注册的 chanel 的线程组执行线程任务ServerSocketChannel
  处理事件 key
  读取 byteBuf 数据
  pipeline.fireChannelRead(readBuf.get(i))
  p.addLast(new ChannelInitializer<Channel>() 
  NioEventLoopGroup整体类内部结构
  如果初始化注册成功
  io.netty.bootstrap.ServerBootstrap#init
  tail = new TailContext(this)
  NIO
  客户端
  因为 pipeline 是 serverSocketChannel 这里会调用regester0 注册时候放入的 ServerBootstrapAcceptor 然后进行注册
  加入到最后,但是不是尾部,是 pipeline 的最后,tail的前一个
  ChannelPipeline pipeline = pipeline()
  newSocket(DEFAULT_SELECTOR_PROVIDER)
  执行调用
  这里注意 是 socketChannel (客户端)不是 serverScoketChannel (服务端)
  实现
  if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0)
  SocketChannel ch = SocketUtils.accept(javaChannel())
  ServerBootstrap 
  // EventLoopGroup 接口 EventLoopGroup childGroup
  this.ch = ch
  doReadBytes(byteBuf)
  安全执行任务
  msg是传过来的 socketChannel
  provider.openServerSocketChannel()
  this.readInterestOp = readInterestOp
  head = new HeadContext(this)
  父类构造方法 传入ServerSocektChannel 和  ACCEPT事件
   executor.execute(new Runnable()
  return (SocketChannel) super.javaChannel()
  chanel 就是工厂创建的ServerSocketChannel 
  执行绑定 doBing0
  io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe#read
  doRegister()
                  SocketChannel sc = iterator.next();                ByteBuffer byteBuffer = ByteBuffer.allocate(128);                // 非阻塞模式read方法不会阻塞,否则会阻塞                int len = sc.read(byteBuffer);
  for 循环
  @Override                        protected void initChannel(SocketChannel ch) throws Exception {                            //对workerGroup的SocketChannel设置处理器                            ch.pipeline().addLast(new NettyServerHandler());                        }
  初始化 channel
  SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider()
  循环对线程池数组进行赋值
  if ((readyOps & SelectionKey.OP_WRITE) != 0)
  NIO 
  判断是否是读连接事件
  pipeline.fireChannelReadComplete()
  ChannelPipeline pipeline = ch.pipeline()
  io.netty.channel.nio.NioEventLoop#run
  参考原生NIO代码
  Channel注册候调用
  注册
  执行读取
  获取设置的 options属性 进行能够设置
  io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages
  读取消息
  Runnable  肯执行run方法
  再次调用父类 并且传入 SelectionKey.OP_READ 事件整型值
  进行调用 ChannelRead 方法
  加入 pipeline 最后
  这个不是 线程池的 execute 
  循环所有 selectedKyes
  属性
  进行循环 readBuf
  这个方法会调用 init 初始化时候放入的 handler  比较繁琐 直接看 回调方法 
  NioEventLoopGroup
  // 线程数组存放的是 NioEventLoopEventExecutor[] children (NioEventLoop)
  provider = selectorProvider
  调用 group方法 设置两个线程组 bossGroup(parentGroup)workerGroup(childGroup)
  参考NIO代码
  处理 连接 | 读| 写等事件 key
  childGroup.register(child).addListener(new ChannelFutureListener()
  ChannelFuture cf = bootstrap.bind(9000).sync()
  连接 会产生 ACCEPT事件
  读取数据调用
  int size = readBuf.size()
  加入 pipleline
  创建线程池数组 长度是nThreads
  核心代码 把 socketChannel 数据都写到 byteBuf
  pipeline = newChannelPipeline();
  AbstractChannelHandlerContext.invokeChannelRegistered(head)
  客户端连接时调用
  父类构造方法
  绑定
  this(newSocket(DEFAULT_SELECTOR_PROVIDER))
  eventLoop.execute(new Runnable()
  加入 ChannelInitializer
  执行绑定
  for (int i = 0; i < nThreads; i ++)
  初始化
  pipeline.fireChannelRegistered()
  init(channel)
  创建 socket 传入的是 原生NIOSelectorProvider.provider()
  task.run()
  寻找 Inbound handler
  ChannelPipeline
  tail 结点
  channel = channelFactory.newChannel()
  pipeline.addLast(handler)
  final ChannelPipeline pipeline = pipeline()
  taskQueue = newTaskQueue(this.maxPendingTasks)
  调用创建的工厂
  private final EventExecutor[] children
  这里是创建的 new ServerBootstrap
  调用父类
  执行注册 regester0方法 的时候才会 回调这个 加入handler
  加入到队列
  findContextInbound(MASK_CHANNEL_REGISTERED)
  把回调方法 要加入的handler加入 并且替换 channelInitializer handler
  channel.eventLoop().execute(new Runnable()
  创建 NioSocketChannel
  pipeline.fireChannelActive()
  处理异常时调用
   
 
 
 
 
  0 条评论
 下一页
  
   
   
   
   
  
  
  
  
  
  
  
  
 