DruidCoordinator源码解析
2023-03-27 14:41:49 0 举报
AI智能生成
登录查看完整内容
DruidCoordinator源码解析
作者其他创作
大纲/内容
在CliCoordinator中绑定,将配置中druid.coordinator开头的配置注入到该类
DruidCoordinatorConfig
在ServerModule中绑定,将配置中druid.zk.paths开头的配置注入到该类
ZkPathsConfig
在JacksonConfigManagerModule中绑定依赖,JacksonConfigManager依赖其他,比如ObjectMapper在JacksonModule中实现了注入的类型,包括Json.class Smile.class,默认为Json.class,并在DefaultObjectMapper中实现了Jackson多态序列化
JacksonConfigManager
在SegmentsMetadataManagerProvider中通过Provider方式注入,实际由SqlSegmentsMetadataManagerProvider提供
SegmentsMetadataManager
ServerInventoryView
MetadataRuleManager
Provider<CuratorFramework>
ServiceEmitter
ScheduledExecutorFactory
IndexingServiceClient
LoadQueueTaskMaster
ServiceAnnouncer
DruidNode
CoordinatorCustomDutyGroups
BalancerStrategyFactory
LookupCoordinatorManager
CompactSegments
ZkEnablementConfig
项目在启动时会注入一个CuratorDruidLeaderSelector,启动zk客户端
coordLeaderSelector.registerListener 注册Listener中 createNewLeaderLatchWithListener(),在LeaderLatchListener中回调 becomeLeader 和stopBeingLeader方法
在生命周期管理的start方法内注册一个listener,监听coordinator选主后的逻辑,成为leader后调用becomeLeader方法,不再是leader时调用stopBeingLeader方法
首先判断是否已经在周期性的从数据库中同步数据了,是则返回,否就继续
poll() 采用jdbi以流式的形式查询metadata表;生成dataSourcesSnapshot
segmentsMetadataManager.startPollingDatabasePeriodically(); SegmentsMetadataManager将数据同步到Coordinator的内存中,MetadataSegmentView 数据在Broker的内存中
创建一个默认的元数据规则,创建的默认规则为ForeverLoadRule, 其中加载到_default_tier,副本数为2
然后按照配置进行定期拉取配置: poll()
metadataRuleManager.start();
lookupManagementLoop()
lookupCoordinatorManager.start();
该方法当前已废弃,通过在CLICoordinator中启动时通过Discovery模块
serviceAnnouncer.announce(self);
DEBUG日志开启的前提下,在日志中打印出来所有在used状态下的segment
LogUsedSegments
获取到所有server,过滤出来可以充当广播和副本的server, 转换为ImmutableDruidServer
prepareCurrentServers()
将segmentsToDrop和segmentsToLoad合并,迭代处理batchSize个数据,加入到newRequests中
NettyHttpClient.go
调用httpclient.go方法
Load则从segmentsToLoad移除,Drop则从segmentsToDrop移除
onSuccess
失败了就打日志
onFailure
增加callback执行方法
从LoadQueueTaskMaster中取一个peon,然后启动,固定延迟执行doSegmentManagement()
startPeonsForNewServers(currentServers) 为每一个server启动一个LoadQueuePeon
因为currentServers为筛选后的节点,所以返回的DruidCluster中只有historical和bridge
内层循环为按historical遍历segment,放入segmentInCluster
内层循环为按queue中的segment遍历,放入loadingSegment
两层遍历,外层循环为按tier遍历historical
segmentReplicantLookup = SegmentReplicantLookup.make segment的副本lookup
UpdateCoordinatorStateAndPrepareCluster
获取到cluster,从DatasourcesSnapshot中获取到overshadowedSegment
更新DruidCoordinatorRuntimeParams,加入replicatorThrottler
遍历所有的规则,如果是broadcast类型的规则就退出内层循环,将DataSource的name加入到broadcastDatasources中
遍历所有的DataSource,根据数据源的name获取到该数据源的所有的规则(这一步是为了拿到所有broadcast的数据源)
如果是overshadowed segment则退出本次循环
判断是否需要分配;判断是否有可用的server来分配;判断是否满足限流条件;满足条件后开始分配:首先注册限流,然后loadSegment,load执行完成后移除掉注册(这里根据不同的load实现是不同的,http的方式会等待historical将segment load完成后返回,zk的方式只是将segment写到historical的loadQueue中)
assignReplicas() -> assignReplicasForTier()
assignPrimary() 分配主副本, 分配后分配副分片
assignReplicas() 继续分配其他副本
assign()
正在loading时不进行drop
tier中当前的副本数大于目标副本数,则进行dropForTier, drop分为从activeServers和decomissionServers分别drop
drop()
LoadRule: 如果主副本已经存在或者正在load,则分配副分片;否则依次分配
UPDATE %s SET used=false WHERE id = :segmentID
DropRule: coordinator.markSegmentAsUnused(segment); 将segment置为unused状态
drop() 从所有server中的loadPeon中drop掉segment
BroadcastDistributionRule 获取到所有可以load broadcast类型segment的server
rule.run(),里边有两个变量:targetReplicants为当前segment的目标副本数 currentReplicants为当前segment的当前副本数(包括已经load和正在loading之和)
遍历所有used状态的segment
对所有状态为used的segment运行所有匹配的规则
RunRules
遍历broadcastDataSource,初始化status为true
按dataSource的segments遍历,dropSegment
遍历server中的dataSource,首先判断是否为broadcastDataSource,是则退出当前循环
分别处理historical、broker、realtime中标记为unused的segments
UnloadUnusedSegments
MarkAsUnusedOvershadowedSegments
将该层的historical按照是否活着(decomminssion)分为两组, decommissioningServers和activeServers。 balance的一个功能就是将已经dead的historical上的segment移动到active的historical
计算出来该层所有的historical中一共有多少segment,如果是0则不需要balance
maxSegmentsToMove 最大要移动的segment数量 ; maxIterations 最大迭代次数;maxToLoad 配置的loadingQueue中最大数量
broadcast类型的segment不做balance
遍历所有的segment,先决定移动k个,然后再随机加入若干个
ReservoirSegmentSampler.getRandomBalancerSegmentHolders
strategy.pickSegmentsToMove 根据配置的balance策略选择将要移动的segment
开始循环进行移动选出来的segment, toMoveToWithLoadQueueCapacityAndNotServingSegment 这个表示移动到的目标机器,这个机器必须不是原来移动前的机器,同时它的loadQueue也得有容量
空间不够或者正在loading segment的historical不参与分配
同一个DataSource的要尽量在一起,更可能被同时查询
Interval更接近的要尽量在一起,更可能被同时查询
https://github.com/apache/druid/pull/2972里边有详细的描述
当前segment的cost加上即将load的segment的cost减去将被drop的segment的cost
线程池执行计算computeCost
遍历选择的结果,找到得分最大的;如果得分是正无穷,说明没有合适的,不用分配;最后得分相同的话随机取一个
chooseBestServer()
strategy.findNewSegmentHomeBalancer 从上述的目标机器中再根据配置策略找到一个目标机器
一系列校验(segment不能为null,fromServer和toServer不能相同,元数据中能找到segment, LoadQueuePeon需要被创建,目标机器还能放的下)
loadSegment 先判断是否已经在load的队列中,然后执行
成功move++, 不成功unmoved++,循环次数超过maxIterations则停止
balanceServers()
按层开始进行balance,如果上次balance没有结束,则本次不进行
BalanceSegments,有四种balance策略,diskNormalized cost cachingCost random,默认为cost
EmitClusterStatsAndMetrics
makeHistoricalManagementDuties
获取到所有要kill掉unusedsegment的数据源
根据前缀、任务类型、数据源、时间间隔等生成任务ID
调用/druid/indexer/v1/task提交一个POST请求,然后处理返回结果
运行任务
遍历数据源,满足运行条件时则调用indexingServiceClient的killUnusedSegments方法提交一个kill task
KillUnusedSegments
KillStalePendingSegments
CoordinatorIndexingServiceDuty
makeIndexingServiceDuties
KillSupervisors
KillAuditLog
KillRules
KillDatasourceMetadata
KillCompactionConfig
CoordinatorMetadataStoreManagementDuty
makeMetadataStoreManagementDuties
将每一个DutyRunnable丢到定时调度线程池运行
添加各种的DutyRunnable
核心代码流程
DruidCoordinator
0 条评论
回复 删除
下一页