Netty实现IM
2021-01-12 14:29:44 0 举报
AI智能生成
netty实现IM
作者其他创作
大纲/内容
服务端启动流程
基本流程
线程模型
.group(bossGroup, workerGroup)
IO 模型 NIO
.channel(NioServerSocketChannel.class)
连接读写处理逻辑
.childHandler()
自动绑定递增端口
serverBootstrap.bind(8000) 异步方法,调用立即返回ChannelFuture
ChannelFuture添加监听器GenericFutureListener,可以监听是否绑定端口成功
自动绑定端口,失败port+1
serverBootstrap.handler()
指定在服务端启动过程中的逻辑
serverBootstrap.attr()
指定自定义属性,通过channel.attr()取出
serverBootstrap.childAttr()
可以给每一条连接指定自定义属性,通过channel.attr()取出
childOption()
可以给每条连接设置一些TCP底层相关的属性
ChannelOption.SO_KEEPALIVE
是否开启TCP底层心跳机制,true为开启
ChannelOption.TCP_NODELAY
是否开启Nagle算法,true表示关闭,false表示开启
如果要求高实时性,有数据发送时就马上发送,就关闭,如果需要减少发送次数减少网络交互,就开启
option()
服务端channel设置属性
ChannelOption.SO_BACKLOG
表示系统用于临时存放已完成三次握手的请求的队列的最大长度,
如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数
如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数
客户端启动流程
客户端启动的引导类是 Bootstrap
失败重连
option()
ChannelOption.CONNECT_TIMEOUT_MILLIS
连接的超时时间,超过这个时间还是建立不上的话则代表连接失败
ChannelOption.SO_KEEPALIVE
是否开启 TCP 底层心跳机制,true 为开启
ChannelOption.TCP_NODELAY
是否开启Nagle算法,true表示关闭,false表示开启
如果要求高实时性,有数据发送时就马上发送,就关闭,如果需要减少发送次数减少网络交互,就开启
客户端服务端双向通信
客户端->服务端
ch.pipeline()添加逻辑处理器
返回的是和这条连接相关的逻辑处理链,采用了责任链模式
继承自 ChannelInboundHandlerAdapter,覆盖了 channelActive()
客户端连接建立成功之后,调用到 channelActive()
Netty数据以ByteBuf 为单位,数据的读写都是 ByteBuf
客户端读数据,覆盖方法 channelRead()
服务端->客户端
ch.pipeline(),添加逻辑处理器
继承自ChannelInboundHandlerAdapter,覆盖的方法 channelRead(),读数据
服务端回复数据
ByteBuf
结构
1.以上三段内容是被两个指针给划分出来的,从左到右,依次是读指针(readerIndex)、写指针(writerIndex),
然后还有一个变量 capacity,表示 ByteBuf 底层内存的总容量
然后还有一个变量 capacity,表示 ByteBuf 底层内存的总容量
2.从 ByteBuf 中每读取一个字节,readerIndex 自增1,ByteBuf 里面总共有 writerIndex-readerIndex 个字节可读,
由此可以推论出当 readerIndex 与 writerIndex 相等的时候,ByteBuf 不可读
由此可以推论出当 readerIndex 与 writerIndex 相等的时候,ByteBuf 不可读
3.写数据是从 writerIndex 指向的部分开始写,每写一个字节,writerIndex 自增1,直到增到 capacity,这个时候,表示 ByteBuf 已经不可写了
4.ByteBuf 里面其实还有一个参数 maxCapacity,当向 ByteBuf 写数据的时候,如果容量不足,那么这个时候可以进行扩容,
直到 capacity 扩容到 maxCapacity,超过 maxCapacity 就会报错
直到 capacity 扩容到 maxCapacity,超过 maxCapacity 就会报错
三种模式
堆缓冲区模式(Heap Buffer)
支撑数组(backing array)。将数据存放在JVM的堆空间,通过将数据存储在数组中实现
优点
由于数据存储在Jvm堆中可以快速创建和快速释放,并且提供了数组直接快速访问的方法
缺点
每次数据与I/O进行传输时,都需要将数据拷贝到直接缓冲区
直接缓冲区模式(Direct Buffer)
堆外分配的直接内存,不会占用堆的容量。
适用于套接字传输过程,避免了数据从内部缓冲区拷贝到直接缓冲区的过程,性能较好
适用于套接字传输过程,避免了数据从内部缓冲区拷贝到直接缓冲区的过程,性能较好
优点
使用Socket传递数据时性能很好,避免了数据从Jvm堆内存拷贝到直接缓冲区的过程。提高了性能
缺点
相对于堆缓冲区而言,Direct Buffer分配内存空间和释放更为昂贵
对于涉及大量I/O的数据读写,建议使用Direct Buffer。
而对于用于后端的业务消息编解码模块建议使用Heap Buffer
而对于用于后端的业务消息编解码模块建议使用Heap Buffer
复合缓冲区模式(Composite Buffer)
本质上类似于提供一个或多个ByteBuf的组合视图,可以根据需要添加和删除不同类型的ByteBuf。
内存管理
Netty 的 ByteBuf 是通过引用计数的方式管理的,如果一个 ByteBuf 没有地方被引用到,需要回收底层内存
默认情况下,当创建完一个 ByteBuf,它的引用为1,然后每次调用 retain() 方法, 它的引用就加一,
release() 方法原理是将引用计数减一,减完之后如果发现引用计数为0,则直接回收 ByteBuf 底层的内存。
release() 方法原理是将引用计数减一,减完之后如果发现引用计数为0,则直接回收 ByteBuf 底层的内存。
多个 ByteBuf 可以引用同一段内存,通过引用计数来控制内存的释放,遵循谁 retain() 谁 release() 的原则
slice()、duplicate() 共用同一个内存
copy() 新的内存
retainedSlice() 与 retainedDuplicate() 同一个内存增加引用计数次数
通信协议
协议设计
魔数,固定几个字节
判断是否遵循自定义协议,类似Java 的字节码二进制文件开头的 4 个字节为0xcafebabe
版本号,预留字段
类似ipv4、ipv6
序列化算法
表示如何把 Java 对象转换二进制数据以及二进制数据如何转换回 Java 对象,
比如 Java 自带的序列化,json,hessian 等序列化方式
比如 Java 自带的序列化,json,hessian 等序列化方式
指令
服务端或者客户端每收到一种指令都会有相应的处理逻辑,这里,我们用一个字节来表示,
最高支持256种指令,对于我们这个 IM 系统来说已经完全足够了
最高支持256种指令,对于我们这个 IM 系统来说已经完全足够了
数据长度
数据内容
编码
封装成二进制的过程
1.创建一个 ByteBuf,ioBuffer() 方法会返回适配 io 读写相关的内存
尽可能创建一个直接内存,直接内存可以理解为不受 jvm 堆管理的内存空间,
写到 IO 缓冲区的效果更高
写到 IO 缓冲区的效果更高
2.将 Java 对象序列化成二进制数据包
3.逐个往 ByteBuf 写入字段,即实现了编码过程
解码
解析java对象
1.校验魔数和版本
2.调用 ByteBuf 的 API 分别拿到序列化算法标识、指令、数据包的长度
3.拿到的数据包的长度取出数据,通过指令拿到该数据包对应的 Java 对象的类型,
根据序列化算法标识拿到序列化对象,将字节数组转换为 Java 对象
根据序列化算法标识拿到序列化对象,将字节数组转换为 Java 对象
pipeline 与 channelHandler
分类
ChannelInboundHandler
处理读数据的逻辑
channelRead()
addLast A B C,处理顺序 A,B,C
super.channelRead(ctx, msg);自动传到下一个handler
ctx.fireChannelRead(packet) 解码后传递给下一个handler
ChannelOutBoundHandler
处理写数据的逻辑
write()
调用父类的 write() 方法,会自动调用到下一个 outBoundHandler 的 write()
addLast A B C,处理顺序 C,B,A
默认情况下会把读写事件传播到下一个 handler
构建客户端与服务端 pipeline
ByteToMessageDecoder
Netty 会自动进行内存的释放
ByteBuf in
传递进来就是ByteBuf 类型
List 类型
往 List 里面添加解码后的结果对象,自动实现结果往下一个 handler 进行传递
实现自定义解码,而不用关心 ByteBuf 的强转和 解码结果的传递
SimpleChannelInboundHandler
继承这个类,同时传递一个泛型参数
channelRead0()
不用判断是否是本 handler 可以处理的对象,
也不用强转,不用往下传递本 handler 处理不了的对象
也不用强转,不用往下传递本 handler 处理不了的对象
可以实现每一种指令的处理,不再需要强转,不再有冗长乏味的 if else 逻辑,不需要手动传递对象
MessageToByteEncoder
将对象转换到二进制数据
不需要每一次将响应写到对端的时候调用一次编码逻辑进行编码,也不需要自行创建 ByteBuf
可以实现自定义编码,而不用关心 ByteBuf 的创建,不用每次向对端写 Java 对象都进行一次编码
拆包粘包理论与解决方案
为什么会有粘包半包现象?
一种是多个字符串“粘”在了一起,我们定义这种 ByteBuf 为粘包
一种是一个字符串被“拆”开,形成一个破碎的包,我们定义这种 ByteBuf 为半包
应用层是按照 ByteBuf 为 单位来发送数据,底层操作系统仍然是按照字节流发送数据
服务端读到字节流拼成 ByteBuf时 与客户端的发送数据并不对等
服务端读到字节流拼成 ByteBuf时 与客户端的发送数据并不对等
拆包原理
1.如果当前读取的数据不足以拼接成一个完整的业务数据包,那就保留该数据,
继续从 TCP 缓冲区中读取,直到得到一个完整的数据包。
继续从 TCP 缓冲区中读取,直到得到一个完整的数据包。
2.如果当前读到的数据加上已经读取的数据足够拼接成一个数据包,那就将已经读取的数据拼接上本次读取的数据,
构成一个完整的业务数据包传递到业务逻辑,多余的数据仍然保留,以便和下次读到的数据尝试拼接。
构成一个完整的业务数据包传递到业务逻辑,多余的数据仍然保留,以便和下次读到的数据尝试拼接。
Netty 自带的拆包器
FixedLengthFrameDecoder固定长度拆包器
如果你的应用层协议非常简单,每个数据包的长度都是固定的,比如 100,那么只需要把这个拆包器加到 pipeline 中,
Netty 会把一个个长度为 100 的数据包 (ByteBuf) 传递到下一个 channelHandler。
Netty 会把一个个长度为 100 的数据包 (ByteBuf) 传递到下一个 channelHandler。
LineBasedFrameDecoder 行拆包器
从字面意思来看,发送端发送数据包的时候,每个数据包之间以换行符作为分隔,
接收端通过 LineBasedFrameDecoder 将粘过的 ByteBuf 拆分成一个个完整的应用层数据包。
接收端通过 LineBasedFrameDecoder 将粘过的 ByteBuf 拆分成一个个完整的应用层数据包。
DelimiterBasedFrameDecoder 分隔符拆包器
行拆包器的通用版本,可以自定义分隔符
LengthFieldBasedFrameDecoder 基于长度域拆包器
最后一种拆包器是最通用的一种拆包器,只要你的自定义协议中包含长度域字段,均可以使用这个拆包器来实现应用层拆包。
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 7, 4);
第一个参数指的是数据包的最大长度
第二个参数指的是长度域的偏移量
第三个参数指的是长度域的长度
拒绝非本协议连接
//ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 7, 4));
// 替换为
ch.pipeline().addLast(new Spliter());
// 替换为
ch.pipeline().addLast(new Spliter());
decode() 方法中,第二个参数 in,每次传递进来的时候,均为一个数据包的开头
hannelHandler 的生命周期
handlerAdded() -> channelRegistered() -> channelActive() -> channelRead() -> channelReadComplete()
channelInactive() -> channelUnregistered() -> handlerRemoved()
channelInactive() -> channelUnregistered() -> handlerRemoved()
handlerAdded()
指的是当检测到新连接之后,调用 ch.pipeline().addLast(new LifeCyCleTestHandler()); 之后的回调,
表示在当前的 channel 中,已经成功添加了一个 handler 处理器
表示在当前的 channel 中,已经成功添加了一个 handler 处理器
channelRegistered()
表示当前的 channel 的所有的逻辑处理已经和某个 NIO 线程(NioEventLoop)建立了绑定关系
channelActive()
当 channel 的所有的业务逻辑链准备完毕(也就是说 channel 的 pipeline 中已经添加完所有的 handler)
以及绑定好一个 NIO 线程之后,这条连接算是真正激活了,接下来就会回调到此方法
以及绑定好一个 NIO 线程之后,这条连接算是真正激活了,接下来就会回调到此方法
channelRead()
客户端向服务端发来数据,每次都会回调此方法,表示有数据可读
channelReadComplete()
服务端每次读完一次完整的数据之后,回调该方法,表示数据读取完毕
channelInactive()
表面这条连接已经被关闭了,这条连接在 TCP 层面已经不再是 ESTABLISH 状态了
channelUnregistered()
既然连接已经被关闭,那么与这条连接绑定的线程就不需要对这条连接负责了,
这个回调就表明与这条连接对应的 NIO 线程移除掉对这条连接的处理
这个回调就表明与这条连接对应的 NIO 线程移除掉对这条连接的处理
handlerRemoved()
给这条连接上添加的所有的业务逻辑处理器都给移除掉
区别
handlerAdded() 与 handlerRemoved()
通常可以用在一些资源的申请和释放
channelActive() 与 channelInActive()
TCP 连接的建立与释放
可以在 channelActive(),实现对客户端连接 ip 黑白名单的过滤
channelRead()
每次读到一定的数据,都会累加到一个容器里面,然后判断是否能够拆出来一个完整的数据包,
如果够的话就拆了之后,往下进行传递
如果够的话就拆了之后,往下进行传递
channelReadComplete()
每次向客户端写数据的时候,都通过 writeAndFlush() 的方法写并刷新到底层
以在之前调用 writeAndFlush() 的地方都调用 write() 方法,然后在channelReadComplete里面调用 ctx.channel().flush() 方法,相当于一个批量刷新的机制
以在之前调用 writeAndFlush() 的地方都调用 write() 方法,然后在channelReadComplete里面调用 ctx.channel().flush() 方法,相当于一个批量刷新的机制
使用 channelHandler 的热插拔实现客户端身份校验
新增身份认证AuthHandler
AuthHandler
判断如果已经经过权限认证,那么就直接调用 pipeline 的 remove() 方法删除自身,
这里的 this 指的其实就是 AuthHandler 这个对象,删除之后,这条客户端连接的逻辑链中就不再有这段逻辑了
否则,关闭连接
这里的 this 指的其实就是 AuthHandler 这个对象,删除之后,这条客户端连接的逻辑链中就不再有这段逻辑了
否则,关闭连接
客户端互聊原理与实现
一对一单聊原理
1.如上图,A 要和 B 聊天,首先 A 和 B 需要与服务器建立连接,然后进行一次登录流程,服务端保存用户标识和 TCP 连接的映射关系。
2.A 发消息给 B,首先需要将带有 B 标识的消息数据包发送到服务器,
然后服务器从消息数据包中拿到 B 的标识,找到对应的 B 的连接,将消息发送给 B
然后服务器从消息数据包中拿到 B 的标识,找到对应的 B 的连接,将消息发送给 B
群聊的发起与通知
群聊原理
1.A,B,C 依然会经历登录流程,服务端保存用户标识对应的 TCP 连接
2.A 发起群聊的时候,将 A,B,C 的标识发送至服务端,服务端拿到之后建立一个群聊 ID,
然后把这个 ID 与 A,B,C 的标识绑定
然后把这个 ID 与 A,B,C 的标识绑定
3.群聊里面任意一方在群里聊天的时候,将群聊 ID 发送至服务端,服务端拿到群聊 ID 之后,取出对应的用户标识,
遍历用户标识对应的 TCP 连接,就可以将消息发送至每一个群聊成员
遍历用户标识对应的 TCP 连接,就可以将消息发送至每一个群聊成员
群聊消息的收发及 Netty 性能优化
共享 handler
@ChannelHandler.Sharable
仿照 Netty 源码里面单例模式的写法,构造一个单例模式的类
每来一次新的连接,添加 handler 的时候就不需要每次都 new 了
压缩 handler - 合并编解码器
MessageToMessageCodec
编解码一个handler处理
无状态的 handler,使用单例模式来实现
handler越长,在事件传播过程中性能损耗会被逐渐放大
每个 Packet 对象都要在每个 handler 上经过一遍
缩短事件传播路径
压缩 handler - 合并平行 handler
一个指令只会执行一个handler,将指令处理handler压缩成一个
IMHandler 是无状态的,可以使用单例的对象
定义一个 map,存放指令到各个指令处理器的映射
每次回调到 IMHandler 的 channelRead0() 方法,找到对应的handler,
调用指令 handler 的 channelRead,内部会做指令类型转换,最终调用到每个指令 handler 的 channelRead0() 方法
调用指令 handler 的 channelRead,内部会做指令类型转换,最终调用到每个指令 handler 的 channelRead0() 方法
IMHandler 和指令 handler 均为单例模式,单机十几万甚至几十万的连接情况下,
性能能得到一定程度的提升,创建的对象也大大减少了
性能能得到一定程度的提升,创建的对象也大大减少了
更改事件传播源
ctx.writeAndFlush()
从 pipeline 链中的当前节点开始往前找到第一个 outBound 类型的 handler 把对象往前进行传播
如果这个对象确认不需要经过其他 outBound 类型的 handler 处理,就使用这个方法
ctx.channel().writeAndFlush()
从 pipeline 链中的最后一个 outBound 类型的 handler 开始,把对象往前进行传播
如果确认当前创建的对象需要经过后面的 outBound 类型的 handler,那么就调用此方法
减少阻塞主线程的操作
channelRead0()中避免耗时操作
Netty 在启动的时候会开启 2 倍的 cpu 核数个 NIO 线程
通常情况下我们单机会有几万或者十几万的连接,一条 NIO 线程会管理着几千或几万个连接
单条 NIO 线程的处理逻辑原理
只要有一个 channel 的一个 handler 中的 channelRead0() 方法阻塞了 NIO 线程,
最终都会拖慢绑定在该 NIO 线程上的其他所有的 channel
最终都会拖慢绑定在该 NIO 线程上的其他所有的 channel
耗时操作丢到线程池处理
避免一些耗时的操作影响 Netty 的 NIO 线程,从而影响其他的 channel
如何准确统计处理时长
writeAndFlush()非 NIO 线程执行是异步操作
调用之后,其实是会立即返回的,剩下的所有的操作,都是 Netty 内部有一个任务队列异步执行的
耗时统计增加listener
心跳与空闲检测
连接假死
在某一端(服务端或者客户端)看来,底层的 TCP 连接已经断开了,但是应用程序并没有捕获到,
因此会认为这条连接仍然是存在的,从 TCP 层面来说,只有收到四次握手数据包或者一个 RST 数据包,连接的状态才表示已断开
因此会认为这条连接仍然是存在的,从 TCP 层面来说,只有收到四次握手数据包或者一个 RST 数据包,连接的状态才表示已断开
问题
对于服务端来说,因为每条连接都会耗费 cpu 和内存资源,大量假死的连接会逐渐耗光服务器的资源,最终导致性能逐渐下降,程序奔溃。
对于客户端来说,连接假死会造成发送数据超时,影响用户体验
造成的原因
1.应用程序出现线程堵塞,无法进行数据的读写。
2.客户端或者服务端网络相关的设备出现故障,比如网卡,机房故障
3.公网丢包。公网环境相对内网而言,非常容易出现丢包,网络抖动等现象,如果在一段时间内用户接入的网络连续出现丢包现象,
那么对客户端来说数据一直发送不出去,而服务端也是一直收不到客户端来的数据,连接就一直耗着。
那么对客户端来说数据一直发送不出去,而服务端也是一直收不到客户端来的数据,连接就一直耗着。
服务端空闲检测
IdleStateHandler
第一个表示读空闲时间,指的是在这段时间内如果没有数据读到,就表示连接假死
第二个是写空闲时间,指的是 在这段时间如果没有写数据,就表示连接假死
第三个参数是读写空闲时间,表示在这段时间内如果没有产生数据读或者写,就表示连接假死
最后一个参数表示时间单位
写空闲和读写空闲为0,表示我们不关心者两类条件
连接假死之后会回调 channelIdle()手动关闭连接
需要放在 pipeline 的最前面
如果插入到最后面的话,如果这条连接读到了数据,但是在 inBound 传播的过程中出错了或者数据处理完完毕就不往后传递了(我们的应用程序属于这类),那么最终 IMIdleStateHandler 就不会读到数据,最终导致误判
客户端定时发心跳
ctx.executor() 返回的是当前的 channel 绑定的 NIO 线程
可以隔一段时间之后执行一个任务
可以隔一段时间之后执行一个任务
服务端的空闲检测问题,服务端这个时候是能够在一定时间段之内关掉假死的连接,
释放连接的资源了,但是对于客户端来说,我们也需要检测到假死的连接
释放连接的资源了,但是对于客户端来说,我们也需要检测到假死的连接
服务端回复心跳与客户端空闲检测
为了排除是否是因为服务端在非假死状态下确实没有发送数据,服务端也要定期发送心跳给客户端
HeartBeatRequestHandler
服务端在收到心跳之后回复客户端,给客户端发送一个心跳响应包即可
客户端的空闲检测其实和服务端一样,依旧是在客户端 pipeline 的最前方插入 IMIdleStateHandler
通常空闲检测时间要比发送心跳的时间的两倍要长一些,这也是为了排除偶发的公网抖动,防止误判
0 条评论
下一页