Netty核心源码——全流程源码
2021-12-29 12:06:15 37 举报
登录查看完整内容
Netty核心源码——全流程源码
作者其他创作
大纲/内容
MultithreadEventLoopGroup从bossGroup中拿一个线程来处理channel的的注册,将其注册到线程自己的Selector上
next.invokeChannelRead(m);
processSelectedKeys();
addLast0(newCtx);
ctx.callHandlerAdded();
pipeline = newChannelPipeline();
TailContext
ServerSocketChannel
Pipeline
把register0放入队列中
启动线程处理
处理数据
调用childHandler里面,每一个pipeline.addLast中的Handler类的channelRegistered方法
NIO代码封装创建SocketChannel
监听端口
ChannelInitializer
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
死循环处理各种事件
EventLoop
doRegister();
register0
new MultithreadEventExecutorGroup
SocketChannel Pipeline
HeadContext
pipeline.addLast(handler);
注冊SelectionKey.OP_ACCEPT| SelectionKey.OP_READ事件時走这里
for (;;)
unsafe.read();
return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
SocketChannel ch = SocketUtils.accept(javaChannel());
nio代码配置channel为非阻塞
return (ServerSocketChannel) super.javaChannel();
ServerBootstrap.init
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
next()
runAllTasksFrom(taskQueue)
Runnable task = taskQueue.poll();
return Mpsc.newMpscQueue();
SingleThreadEventExecutor.this.run();
bootstrap.bind(9000)
childGroup.register(child)
.channel(NioServerSocketChannel.class)
SingleThreadEventLoop
注册事件SelectionKey.OP_ACCEPT |SelectionKey.OP_READ
pipeline.fireChannelRead(byteBuf);
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
pipeline.invokeHandlerAddedIfNeeded();
从head--ServerBootstrapAcceptor---tail 开始递归调用serverChannel的pipeline里面所有的InboundHandler
startThread();
客户端注册的时候会产生SelectionKey.OP_ACCEPT事件
doStartThread();
return USE_MPSC_CHUNKED_ARRAY_QUEUE ? new MpscUnboundedArrayQueue<T>(MPSC_CHUNK_SIZE) : new MpscUnboundedAtomicArrayQueue<T>(MPSC_CHUNK_SIZE);
callHandlerAdded0(ctx);
task.run();
return constructor.newInstance();
strategy = select(curDeadlineNanos);
NettyServerHandler
Netty核心源码——全流程源码(4.1.46)
ServerBootstrapAcceptor.channelRead
final ChannelFuture regFuture = initAndRegister();
run()
以下是NIO封装代码
selector.select(timeoutMillis)
当客户端往服务端发送数据的时候产生:SelectionKey.OP_READ事件的数据
初始化SocketChannel 的Pipeline
PlatformDependent
NioEventLoop
((ChannelInboundHandler) handler()).channelRegistered(this);
p.addLast(new ChannelInitializer<Channel>() {×××}
javaChannel()
初始化ServerChannel的pipeline
循环添加handler
把Task线程放入TaskQueue异步执行
int localRead = doReadMessages(readBuf);
把ChannelInitializer添加到Pipeline中儅ChannelIn注冊的時候調用
offerTask(task)
AbstractChannel
Client
NioEventLoopGroup
ch.configureBlocking(false);
NIO API,注册客户端读写事件
执行task.run
初始默认线程数
NioEventLoop.run
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
return new DefaultChannelPipeline(this);
return;
Runnable task = pollTaskFrom(taskQueue);
processSelectedKeysOptimized();
workerGroup
ServerBootstrap
调转到ChannelInitializer类Channel注册是调用,调完之后
for (int i = 0; i < selectedKeys.size; ++i)
super(parent);
2
DEFAULT_EVENT_LOOP_THREADS
ChannelFuture regFuture = config().group().register(channel);
enableAutoReadTask = new Runnable() { @Override public void run() { channel.config().setAutoRead(true); } };
next.invokeChannelRegistered();
return ch
task.execute(); task = task.next;
Selectorepoll_create
pipeline.fireChannelRegistered();
BaseMpscLinkedArrayQueueProducerFields
pipeline.fireChannelRead(readBuf.get(i));
SelectorProvider.provider())的源码流程见:Netty核心源码——NIO(Non Blocking IO)——同步非阻塞
for (int i = 0; i < nThreads; i ++)
1
ch.configureBlocking(false);
ServerSocketChannel Pipeline
callHandlerAddedForAllHandlers();
回调执行 register0(promise);
3
初始化NioServerSocketChannel(对ServerSocketChannel的包装)
eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } });
TaskQueue
回调执行register0
addTask(task);
NioByteUnsafe
register
ranTasks = runAllTasks();
绑定网络监听端口
ChannelPipeline p = channel.pipeline();
bossGroup
child.pipeline().addLast(childHandler);
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue() : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
pipeline = newChannelPipeline();
return provider.openServerSocketChannel();
NioMessageUnsafe
AbstractNioChannel
new NioServerSocketChannel
获取task
把ServerBootstrapAcceptor添加到Pipeline
设置非阻塞
super(parent);
SocketChannel 注册逻辑和ServerSocketChannel注册逻辑一样,注册完会调用SocketChannel里面的ChannelInitializer把里面我们自己写的Handler全部放入Pipeline
safeExecute(task);
newTaskQueue(queueFactory)
initChannel(final Channel ch)
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
调用childHandler里面,每一个pipeline.addLast中的Handler类的handlerAdded方法
pipeline.invokeHandlerAddedIfNeeded();回調
for (;;)
AbstractUnsafe
handler().handlerAdded(this);
return (EventLoop) super.next();
将连接过来的socket注册到WorkGroup里的一个线程的selector
DefaultThreadFactory
return bind(new InetSocketAddress(inetPort));
默认是机器逻辑内核×2
MultithreadEventExecutorGroup
NioServerSocketChannel
nio代码ServerSocketChannel.open();
for (ChannelHandler h: handlers)
ServerBootstrapAcceptor
监听IO事件 阻塞等待需要处理的事件发生
return chooser.next();
childHandler就是netty服务端初始代码我们自己写的ChannelInitializer
EventLoopGroup group = new NioEventLoopGroup();
unsafe = newUnsafe();
final ChannelPipeline pipeline = ch.pipeline();
AbstractNioChannel
調用p.addLastChannelInitializerinitChannel
将SocketChannel封装成NioSocketChannel
return next().register(channel);
channel = channelFactory.newChannel();
init(channel);
readBuf里面放的是所有OP_ACCEPT事件链接过来的所有SocketChaneel
return taskQueue.offer(task);
AbstractChannelHandlerContext.invokeChannelRegistered(head);
idx是AtomicInteger轮询算法
循环处理selectedKeys的所有key(事件)
return doBind
ch.eventLoop().execute(new Runnable() {×××}
当timeoutMillis超时或者有事件发生的时候执行
0 条评论
回复 删除
下一页