servreSocketChannel
触发OP_READ事件
对上面配置的NioServerSocketChannel(作为服务器的通道实现)实例化吧
next().register(channel);
EventLoop
server 启动过程中。执行runAllTask(),NioServerSocketChannel 才会注册到 BossGroup 中的 NioEventLoop 的 Selector 上。
初始化了一个 Queue<Runnable> taskQueuethis.font color=\"#e74f4c\
调用父类的构造器
processSelectedKeys();
bossGroup
runAllTasksFrom(taskQueue)
AbstractChannel.register()
initChannel
register0(promise);
//child 就是 client 连接的 NioSocketChannelfinal Channel child = (Channel) msg;//childHandler 就是我们自定的 Handler,将自定义的Handler放入客户端的channel的pipeline中 child.pipeline().addLast(childHandler);
childGroup 为 workerGroup(MultithreadEventExecutorGroup),child 为 客户端连接的 NioSocketChannel
客户端连接,触发BossGroup上的Selector的Accept事件
plipline
EventLoopGroup
数据读取 read 事件
调用pipeline的 channelHandler的handlerAdded
head
pipeline.fireChannelRegistered();
回调在 ServerBootstrap 中ChannelInitializer 中自定义添加的 Handler 放入 NioSocketChannel的 pipline 中
tail
group = bossGroup
initAndRegister();
AbstractChannel.doRegister();
ServerBootstrapAcceptor .channelRead()
ChannelInitializer
分配 ByteBuf - 堆外内存(零拷贝)
childGroup = workerGroup
NioSocketChannel——pipeline
SocketUtils.accept(javaChannel());
channel.pipeline();
调用NioServerSocketChannel的pipeline的Handler的channelRead方法,NioServerSocketChannel 的 pipline 中只有ServerBootstrapAcceptor 这一个处理器
将NioServerSocketChannel注册到 selector中
runTask(task);
SingleThreadEventExecutor.startThread();
channelFactory.newChannel();
processSelectedKeysOptimized();
监听端口
doReadMessages(readBuf);
ServerBootstrapAcceptor.channelRead()
bootstrap.childHandler=new ChannelInitializer<SocketChannel>()
sctpChannel.configureBlocking(false);设置为非阻塞
实际就是调用的NioServerSocketChannel的pipeline 的 ServerBootstrapAcceptor 的 channelRead 方法
bootstrap.bind(9000)绑定端口
pipeline.fireChannelRead(readBuf.get(i))
客户端发送数据,触发WorkerGroup的OP_READ事件
config().group().register(channel);
new NioServerSocketChannel()
selector
实际创建的是NioEventLoopnew NioEventLoop()
NioServerSocketChannel注册完之后回调
SingleThreadEventExecutor 将上面的 register 事件,放入 taskQueue 队列中,异步执行。taskQueue.offer(task);
task
ServerBootStrap.init()
strategy = select(curDeadlineNanos);
ServerBootstrapAcceptor是ServerBootStrap的内部类
finally
AbstractNioByteChannel.read()
调用pipeline的 channelHandler的channelRegistered
NioServerSocketChannel 调用的是BossGroup中的NioEventLoop线程的run方法
触发Accept事件,触发的是BossGroup中的NioEventLoop的selector
pipeline.fireChannelRead(byteBuf);
这里调用是NioServerSocketChannel的pipeline的handler的registered方法。实际也就是执行ServerBootstrapAcceptor.channelRegistered(),head、tail节点只是作为标识节点,没有实际作用。ServerBootstrapAcceptor 的 channelRegistered 方法是空实现没有做任何事情
((ChannelDuplexHandler) handler).channelRegistered(this);
MultithreadEventExecutorGroup
死循环监听io事件,当有事件发生或者达到超时时间,解除阻塞,往下执行(会等一会看有没有事件,如果没有事件发生也会超时退出,执行初始化逻辑)
workerGroup
将SocketChannel包装成NioSocketChannel
for (int i = 0; i < selectedKeys.size; ++i)
ranTasks = runAllTasks();
new NioEventLoopGroup(3)
p.addLast(new ChannelInitializer<Channel>() {....})
实例化
pipeline = newChannelPipeline();创建该channel对应的pipleine
NioMessageUnsafe.read()
readBuf存放是的本次连接事件过来,生成的所有的NioSocketChannel
ServerBootstrapAcceptor
至此,轮询调用 Worker Group 中的 NioEventLoop 执行 NioSocketChannel 的 register 流程。和 NioServerSocketChannel 的注册业务流程一样(封装具体的业务的注册流程register0,调用的NioEventLopp的executer方法-该方法将register0放入的当前的NioEventLoop的TaskQueue中。然后启动的本人的线程,执行run方法——死循环去监听事件selector.select(),当事件触发或者达到超时时间解除阻塞,执行processSelectedKeys();,runAllTasks())。
client
SelectorTuple 是Netty 对 selector 做了一次包装,this.selector = selectorTuple.selector;this.unwrappedSelector = selectorTuple.unwrappedSelector;复制给 NioEventLoop 的属性
服务端的 NioServerSocketChannel 注册到 BossGroup中的NioEventLoop
ch.configureBlocking(false);设置为非阻塞
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { unsafe.read(); }
调用所有的Handler的 channelRead方法
channel(NioServerSocketChannel.class)
初始化 BossGroup、workerGroup线程, NioEventLoopGroup 的 children 属性 就是 NioEventLoop (也是一个线程池),初始化 selector - 多路复用器、和 TaskQueue 任务队列。
给 ServerSocketChannel的pipeline 上添加一个 ChannelInitializer ,public void initChannel(final Channel ch)在 channel 注册时调用
port
死循环
newChannelPipeline()
Accept 事件- 连接
处理异步队列里面的任务
new DefaultChannelPipeline(this);
childGroup.register(child)
NioServerSocketChannel--Pipeline
super(parent);
OP_READ | OP_ACCEPT)
在初始操作
客户端的 NioSocketChannel 注册到 WorkerGroup中的NioEventLoop
SingleThreadEventExecutor.this.run();(调用 NioEventLoop.run())
调用Nio的代码,返回客户端创建SocketChannle serverSocketChannel.accept();
SingleThreadEventLoop
taskQueue
调用nio的代码,provider.openSelector();final SelectorTuple selectorTuple = openSelector(); this.selector = selectorTuple.selector;
从bossGroup中拿一个线程(NioEventLoop)来处理,将channel 注册到 自身的 selector 上。
eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } });调用的是NioEventLoop中的exector方法
ServerBootstrap
for循环处理有事件发生的channel
pipeline.invokeHandlerAddedIfNeeded();