ddmq producer
2020-08-03 14:02:48 0 举报
ddmq producer流程图
作者其他创作
大纲/内容
监听到PProxy配置变更
异步执行ClusterProducer.send()
如果绑定broker信息中的proxy不包含当前proxy(同一cluster的topic可以单独指定proxy)
是
MetricUtils.统计qps
注册异常handler跟shutdownHook
创建CarreraRequest
初始化producer生产者池
是否合法
否
metricUtils记录message 类型以及topic请求次数
通过path参数加载proxyConfig(里面保存了broker的列表,以及broker的配置)
调用ProducerPool send
成功?
更新requestLimiter
根据remoteConfigInfo获取zk地址从远端加载proxyConfig
返回FAIL_REFUSED_BY_RATE_LIMITER
检查是否被限速
更新producerPool中producerManager的producer池
初始化producerPool
与configmanager保存的topic对比
通过path参数加载remoteConfigInfo
返回FAIL_NO_PRODUCER_FOR_CLUSTER
校验request(key < 255?tags.length <255)
brokers.size > 1
结束
获取pproxy的topic列表
返回ASYNC
根据broker从producerPool获取到ClusterProducer
本地模式?
初始化ConfigManager
放入重试队列重试次数+1并返回ASYNC
index = msg.partitionId > 0 ? msg.partitionId:msg.partitionId == -1? msg.hashId:msg.key.hashCode()返回brokers.get(index)
返回FAIL_TOPIC_NOT_ALLOWED
broker=brokers.get(0)
从zk读取新增的topic的配置,
MetricUtils统计dropCounter
创建proxyApp并启动proxyApp
启动ProducerProxy
返回FAIL_ILLEGAL_MSG
topic不存在or brokers为空
DropLogger记录message
查询configManager中保存的topic配置
根据异常错误设置相应的ERRORCODE
异步逻辑因producer类型而异,总体都是构造响应的message,发送,成功则执行onFinish回调,失败则按照响应错误设置ERRORCODE
获取成功
topic与当前proxy不是同一个cluster or topic没有与任何broker绑定
thrift server接收到日志请求
执行回调
监听到TopicConfig变更
metric中注册新得topic
初始化producer的topic metric(topic发送指标,失败指标)
删除多余的topic
停止timeouthandle
初始化tpslimiter(每个topic分别有两个limiter)
循环遍历topic中关联的broker
检查是否超过限速次数
MetricUtils.统计响应时间
执行的流程与topicConfig变更类似
删除configmanager中相关的broker/topic(可能之前这个topic绑定了当前proxy)
初始化thrift server
获取pproxy的broker列表
添加/更新configManager中topic/broker信息
删除configManager中该topic已经失效的topic/broker的记录
返回不是ASYNC
收藏
收藏
0 条评论
下一页