Buffer
2019-12-06 15:14:07 0 举报
异步io 类图
作者其他创作
大纲/内容
buffer
SendHeadFrame
- byte[] body;
写模式
创建SelectorProvider 和 ChannelAdapter
AsyncSendDispatcher
+ queue:Queue<SendPacket> = new ConcurrentLinkedQueue(); // 发送队列+ sender: Sender; // 发送执行者,SocketChannelAdapter- isSending:AtomicBoolean;- isClosed: AtomicBoolean;- queueLock:Object // 队列锁- reader: AsyncReader
+ send(SendPacket packet):void //放入队列 + cance(SendPacket packet): void // 取消发送,从队列里删除+ takePacket(): SendPacket // 拿packet+ completePacket(SendPacket packet);+ close(): void // 关闭reader+ provideIOArgs(): IOArgs // 通过reader读取frame来提供IOArgs+ onConsumeFailed(IOArg ioArgs): void+ onConsumeCompleted(IOArgs ioArgs) // 发送下一个packet
读消息
获取channel
发送调度者;作用:1.缓存发送队列;2.通过队列,最终调用sender来发送;3.实现对数据的包装,在header里封装数据包;
BaseConnector
socketChannel:SocketChannelsender:Sender:SocketChannelAdapterreceiver:Receiver:SocketChannelAdaptersendDispatcher: SendDispatcherreceiveDispatcher: ReceiveDispatcher- receivePacketCallback: ReceivePacketCallback 接收回调
setup(SocketChannel socketChannel): void 声明变量,调用receiveDispatcher的startReceiveonReceiveNewMessage(String msg): void 抽象方法send(String msg): void 发送字符串send(Packet pakcet): void
开启线程循环
IOSelectorProvider
startRead():voidstartWrite():void- handleSelection(params):voidregisterInput(params):booleanregisterOutput(params):boolean- registerSelection(params):SelectionKeyunRegisterInput(params):voidunRegisterOutput(params):void
HandleInputCallback
canProviderInput():void
查询selector
count >0?
Server
开启SockertChannel
CmdCancelSendFrame
+ attribute1:type = defaultValue+ attribute2:type- attribute3:type
+ operation1(params):returnType- operation2(params)- operation3()
设置非阻塞
Packet<T extends Closable>
+ stream: Stream;- length: long;
+ open(): T; 调用抽象的createStream+ close(): void;+ byte(): byte[];+ createStream(): Stream 抽象方法;+ closeStream(Stream stream): void 默认关闭stream流+ headerInfo(): byte[] 提供而外头部信息
Capacity
是否大于0
WriteableByteChannel
n
AsyncPacketReader
- provider: PacketProvider- ioArgs: IOArgs;- node: BytePriorityNode<Frame>- nodeSize: int
+ fillData(): IOArgs
ClientHandler
connector:Connector 匿名内部类socketChannel:SocketChannelclientHandlerCallback: ClientHandlerCallback
<<java.lang.Runnable>>
BaseSendFrame
+ headRemaning:int = FRAME_HEAD_LENGTH+ bodyRemaning:int
+ handle(IOArgs ioArgs): boolean 有具体实现处理当前packet- consumeBody(IOArgs ioArgs): int 抽象方法- consumeHead(IOArgs ioArgs): int 有具体实现从ioargs里读取 headRemaning
遍历selectionKey
ClientHandlerCallback
注册OP_WRITE事件
Position
StringSendPacket
- byte[] bytes;
+ StringSendPacket(String str): StringSendPacket+ bytes(): byte[];
setup方法创建并且启动sender和receiver;send方法:将str组装成packet,最后交给ioArgs来发送;receive方法:将packet数据拆分成ioargs;
<<Closeable>>
StringReceivePacket
- buffer: byte[]; 用来装读取到数据的buffer- int position; 保存位置
<<ReceiveDispatcher>>
+ start(): void+ stop(): void
TCPClient
send() 主动发起setup(socketChannel)onReceiveNewMessage(String msg)
setup(socketChannel)onReceiveNewMessage(String msg)
开启线程
Server端
处理selectionKey
0 1 2 3 4 5
Limit
BytePriorityNode
priority: byte;item: Item; 实际保存的数据next: BytePriorityNode<Item> 下一帧数据
+ appendNode(): void
IOArgs 封装buffer的操作
byteBuffer:byte[] 缓冲buffer:ByteBuffer
read(socketChannel): intwrite(socketChannel):intbufferString():String
Packet规则
SendPacket 发送包
+ isCanceled:boolean= false
+ bytes(): byte[]+ isCanceled: boolean
<<ReceivePacketCallback>>
InputStream
循环read
发送流程
注册selector OP_ACCEPT事件
bytes: byte[];
+ createStream(): ByteArrayInputStream 提供输入流,最终输出给channel buffer
新建clientHandler
Frame
+ TYPE_PACKET_HEADER: byte = 11+ TYPE_PACKET_ENTITY: byte = 12+ TYPE_COMMAND_SEND_CANCEL: byte = 41+ TYPE_COMMAND_RECEIVE_REJECT: byte = 42- header: byte[]
通过buffer发送消息
<<Receiver>>
receiveAsync(IOArgs.IOArgsEventListener listener):void
bytes: byte[] 缓冲byteBuffer :ByteBuffer
yes
注册 OP_CONNECT事件
接受调度者;作用:1.接收的数据,将多个ioargs封装成1个packet;2.通过ReceivePacketCallback来接受数据包;3.start方法:4.stop方法
AsyncReceiverDispatcher
ioargs: IoArgsreceiver:Receiverpacket:ReceivePacketcallback:ReceivePacketCallback 接收数据的回调
start(): void 调用receiver的receiveAsync方法stop(): void 调用receiver的stop方法
TCPServer逻辑
IOArgs是用来封装数据发送的地方
msg1-a
读模式
Provider作用:1.向selector里注册channel需要监听的事件2.维护callbackMap3.事件调度
遍历selector
body
注册OP_READ事件
取消对key的监听
移除selectionKey
通道适配器适配器模式:输入不同,输出相同这个类作用:实际上,sendAsync和receiveAsync都是通过IOProvider向selector注册channel的监听事件;最后通过调用回调函数向外传播消息;回调的callback就是从Provider里获取到的
Client
msg1-b
FileSendPacket
- file: File;- length: int;
启动clientHandler
1
Adapter里最终调用的就是Provider的方法
<<java.io.Closeable>>
拿到callback
AsyncPacketWriter
Channel.write
服务端逻辑
handleSelection
声明selector和channel
- canceled: boolean
+ isCanceled(): void;+ cancel(): void; 取消
开始
connector:setup
File/String
string: Stringlength: Long
StringReceivePacket(long len) 构造方法buildEntity(ByteArrayOutputStream stream): String 返回一个String对象createStream(long length): ByteArrayOutputStream; 提供接收的数据流,用来接收buffer
读逻辑
Channel.read
String
ReadableByteChannel
关联
开启ServerSockertChannel
TCPServer
循环
SendEntityFrame
- channel: ReadableChannel;- unConsumedEntityLength: long; // 未消费body长度
Frame规则
Y
+ queue:Queue<SendPacket> = new ConcurrentLinkedQueue();+ sender: Sender;- isSending:AtomicBoolean;- ioArgs: IOArgs;- total: int;- position: int;
+ send(SendPacket packet):void 放入队列
OutputStream
5
第二阶段,解决粘包拆包问题
写逻辑
SocketChannelAdapter
channel:SocketChannelioProvider:IOProviderreceiveIOEventListener: IOArgs.IOArgsEventListenersendIOEventListener: IOArgs.IOArgsEventListenerinputCallback: HandleInputCallback 输入回调outputCallback: HandleOutputCallback 输出回调
receiveAsync(params):booleansendAsync(params):boolean
发送失败
读取消息
接收流程
注册OP_READ
Client端
<<Sender>>
msg1
HandleOutputCallback
attach:Object IOArgs
setAttach():voidcanProviderOutput(params):void
包头 (4byte)
connector作用:封装了sender、receiver、sendDispatcher、receiverDispatcher;对外提供了send和close方法也提供了receiver的回调逻辑
startRead逻辑
Packet
type:bytelength:int
+ byte():byte+ length():int
<<IOProvider>>
registerInput(params):booleanregistOutput(params):booleanunRegistInput(params):voidunRegistOutput(params):void
注册OP_ACCEPT
ReceivePacket 接收包
查询selector keys
limit: intbyteBuffer: ByteBuffer 缓冲区
read(socketChannel): int 读取channelwrite(socketChannel): int 将缓存中数据写入channelreadFrom(ReadableByteChannel channel): int 将String等数据转化为byteChannel,最终读取到buffer里writeFrom(WritableByteChannel channel): int 将channel的数据读取到buffer,最终通过channel输出到outputStreamsetLimit(int limit): void 设置数据包的大小;writeLength(int total) :void 在首包里,写入数据包长度readLength(): int capacity(): int 返回byteBuffer容量
调用ioArgs去发送数据
调用线程池执行callback
是否已关闭Provider
<<SendDispatcher>>
+ send(SendPacket packet):void+ cancel(SendPacket packet):void
0
BaseSendPacketFrame
packet: SendPacket<?> 当前对应的发送packet
keys > 0 ?
flip
<<IOArgsEventListener>>
onStarted(IOArgs):voidonCompleted(IOArgs):void
第4次,文件分片发送
1.如果postion > total 说明发送完了2. 如果position = 0. 写入包体长度
FileReceivePacket
file: File
Connector
socketChannel:SocketChannelsender:Sender:SocketChannelAdapterreceiver:Receiver:SocketChannelAdapterechoReceiveListener: IOArgsEventListener
setup(SocketChannel socketChannel):void readNextMessage():void 调用receiver的receiveAsync方法onReceiveNewMessage(String str):voidsend(String msg):void ——> StringSendPacketsend(SendPacket packet):void
客户端逻辑
包体
创建Connector
收藏
收藏
0 条评论
回复 删除
下一页