Netty核心源码流程
2024-01-22 21:46:19   1  举报             
     
         
 Netty启动流程,Selector事件监听,线程模型核心源码详解
    作者其他创作
 大纲/内容
 pipeline = newChannelPipeline();
  创建客服端Group
  连接客服端监听的端口
  selectedKeys != 0有事件发生
  startThread();    doStartThread();
  channel = channelFactory.newChannel();
  客服端
  processSelectedKeys();处理selectedKeys事件
  for (;;) 死循环监听等待事件发生
  readBufNIOSocketChannel
  业务Handler
  p.addLast(new ChannelInitializer<Channel>(){...}
  bootstrap.bind(inetPort)绑定监听端口Netty服务端核心源码
  MultithreadEventLoopGroupnext().register(channel);
  设置WorkGroup
  NioServerSocketChannel构造方法
  int selectedKeys = selector.select(timeoutMillis);事件监听,当有事件发生或者等待超时执行下面的方法
  selector
  ChannelPipeline p = channel.pipeline();
  init(channel);
   doRegister();
  super(parent);
  判断是事件类型
  TailContext
  callHandlerAddedForAllHandlers();
  new NioEventLoop()
  pipeline.invokeHandlerAddedIfNeeded();
  BossGroup
  super()NioEventLoopGroup();默认线程数为cpu*2
  ChannelOption设置TCP参数Netty提供灵活的TCP参数配置能力
  NioServerSocketChannel.classNetty提供的NIO包装类后面会通过反射实例化
  addTask(task)
  设置BossGroup
  constructor.newInstance();
  服务端(ServerSocketChannel)的pipeline结构
  trueEventLoop未初始化
  调用服务channel pipeline里的Handlerpipeline.fireChannelRead(readBuf.get(i));
  bootstrap.bind(9000)
  拿到pipeline 的头,调用管道中的所有HandlerPendingHandlerCallback task = pendingHandlerCallbackHead;while (task != null) {            task.execute();            task = task.next;        }
  ChannelInitializer
  register(channel)
  OP_ACCEPT连接事件
  NioEventLoopGroup bossGroup监听Selector上的连接事件
  addTask()异步执行
  config().group()获取最开始设置的BossGroup线程组
  注意:此时线程是workerGroup中的线程。监听的事件也是workerGroup中某一个NIoEventLoop上的Selector事件OP_READ读事件
  放入TaskQueue的任务pipeline.addLast(Handler);
  newChild()
  new NioSocketChannel
  开启线程异步执行TaskQueue中的任务
  childHandlerinitChannel(SocketChannel ch){ch.pipeline().add(Handler)}客服端channel 的管道,其中保存着业务Handler,执行自定义或者Netty提供的业务代码
  创建服务端Group
  eventLoop.execute()netty封装的线程,并不是开启线程任务,执行这个方法还有业务逻辑执行
  将连接过来的SocketChannel注册到workerGroup一个线程的Selector上childGroup.register(child)
  WorkerGroup
  执行服务端pipeline的HandlerServerBootstrapAcceptor的channelRead方法
  child.pipeline().addLast(childHandler);
  移除ChannelInitializer
  HeadContext
  读取数据到byteBufdoReadBytes(byteBuf)
  注册连接事件添加ServerBootstrapAcceptor
  new DefaultChannelPipeline(Channel)
  这里使用的是直接内存Netty中零拷贝的核心byteBuf = allocHandle.allocate(allocator);
  NioSocketChannel
  workerGroup中的线程选择逻辑,轮询方式选择executors[idx.getAndIncrement() & executors.length - 1];
  NioSocketChannel.classNetty提供的NIO包装类后面会通过反射实例化
  taskQueue
  反射创建NioServerSocketChannel.
  serverSocketChannel.accept();
  Bootstrap
  run()
  public NioServerSocketChannel() {        this(newSocket(DEFAULT_SELECTOR_PROVIDER));    }
  读事件的注册和连接事件的注册逻辑一样都是放入TaskQueue异步进行注册完将自定义的Handler写入SocketChannel的pipeline
  创建EventLoop
  runAllTasks()Runnable task = pollTask();
  创建config,放入serversocketchannel和ServerSocket后面注册监听事件和绑定端口从config中获取
  nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads
  next()
  设置Group
  返回NIO的SocketChannelSocketChannel ch = SocketUtils.accept(javaChannel());
  再开启一个线程异步执行下面的方法executor.execute
  doBind(localAddress);
  创建服务端pipeline
  移除ChannelInitializerpipeline.remove(this);
   NioEventLoop
  NioEventLoopGroup workerGroup监听Selector上的读写事件
  获取BossGroup中的NioEventLoop
  fireChannelRead()
  线程选择
  依次调用pipeline中SocketChannel中的Handler上的channelRead方法pipeline.fireChannelRead(byteBuf);
  ch.eventLoop().execute()
  创建serverSocketChannel
  ServerSocketChannel的pipeline结构
   this(newSocket(DEFAULT_SELECTOR_PROVIDER));
  new
  taskQueue = newTaskQueue(this.maxPendingTasks);
  ServerBootstrap
  group
  有客服端连接注册读事件时添加业务Handler
  selector = selectorTuple.selector;
  taskQueue.offer(task);将任务放入创建BossGroup是初始化的任务队列中
  注册Accept/OP_READ事件到Selector
  socketChannel.connect(remoteAddress);
  依次调用pipeline中SocketChannel中的Handler上的ChannelReadComplete方法pipeline.fireChannelReadComplete();
  doReadMessages
  newChannelPipeline()
  if (!inEventLoop)
  register0(promise);
  执行ChannelInitializer中的initChannel方法
  Handler执行
  new NioEventLoopGroup();
  NioMessageUnsafe. unsafe.read();
  .    .   .
  newSocket(DEFAULT_SELECTOR_PROVIDER)provider.openServerSocketChannel();
  设置serversockerchannerthis.ch = ch;设置连接监听事件this.readInterestOp = readInterestOp;设置非阻塞ch.configureBlocking(false);
  服务端
  select(wakenUp.getAndSet(false));
  ServerBootstrapAcceptor
  for (;;){    safeExecute(task);    task = pollTask();    if(task == null){        brek;    }}执行TaskQueue中的任务
  SocketChannel的pipeline结构
                          HandlerinitChannel(SocketChannel ch){ch.pipeline().add(Handler)}添加业务Handler,客服端在接收或者发送数据是会依次调用这些Handler
  parent是 NioServerSocketChannelthis.parent = parent;pipeline = newChannelPipeline();
  pipeline 
  NioEventLoop的run方法SingleThreadEventExecutor.this.run();
  NioByteChannel. unsafe.read();
  将自定义Initializer放入pipeline后面注册会真正放入Handler
  super(parent);设置读事件监听值this.readInterestOp = readInterestOp;设置非阻塞ch.configureBlocking(false);将SocketChannel封装当前类属性中this.ch = ch;
  add()
  initAndRegister()
  与服务端类似initAndRegister();
  pipeline = pipeline();
  config().group().register(channel);
  往taskQueue添加的任务
  根据pipeline执行操作不同的管道
    
    收藏 
      
    收藏 
     
 
 
 
 
  0 条评论
 下一页
  
   
   
  
  
  
  
  
  
  
  
 