kubernetes 的 client-go 源码解析
2022-08-03 09:18:57 0 举报
AI智能生成
登录查看完整内容
kubernetes 的 client-go 源码解析,关注我,后续将有更多的源码解析
作者其他创作
大纲/内容
主要分析 informer 的机制
说明
拿 deployment_controller_manager 来解析
源码位置 pkg/controller/deployment/deployment_controller_test.go:269
例子
f := newFixture(t)
metav1.TypeMeta{APIVersion: \"apps/v1\
Image: \"foo/bar\"
d := newDeployment()
f.expectCreateRSAction(rs)
f.expectUpdateDeploymentStatusAction(d)
期望的测试结果
f.client = fake.NewSimpleClientset(f.objects...)
defaultResync = 0
font color=\"#ff0000\
informers.Apps().V1().Deployments()
informers.Apps().V1().ReplicaSets()
informers.Core().V1().Pods()
入参
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events(\"\")})
client
eventRecorder
queue
DeploymentController{}
dc.rsControl = controller.RealRSControl()
f.informers[informerType] = informer
f.factory.InformerFor()
ListFunc
WatchFunc
ListWatch 对象
&appsv1.Deployment{} 关注的的对象
resyncPeriod 重新同步周期
indexers 缓存对象
objectType
processor
cacheMutationDetector
indexer
NewSharedIndexInformer() 创建 Informer
newFunc -> defaultInformer
dInformer.Informer()
listener := newProcessListener()
s.processor.addListener(listener)
AddEventHandlerWithResyncPeriod()
dc.addDeployment
dc.updateDeployment
dc.deleteDeployment
ResourceEventHandlerFuncs
dInformer.Informer().AddEventHandler()
rsInformer.Informer().AddEventHandler()
podInformer.Informer().AddEventHandler()
dc.syncHandler = dc.syncDeployment
dc.enqueueDeployment = dc.enqueue
dc.dLister = dInformer.Lister()
dc.rsLister = rsInformer.Lister()
dc.podLister = podInformer.Lister()
dc.dListerSynced = dInformer.Informer().HasSynced
dc.rsListerSynced = rsInformer.Informer().HasSynced
dc.podListerSynced = podInformer.Informer().HasSynced
NewDeploymentController
KnownObjects:s.indexer 缓存对象
EmitDeltaTypeReplaced: true
fifo := NewDeltaFIFOWithOptions
Queue: fifo
ListerWatcher: s.listerWatcher
ObjectType: s.objectType 关注的对象,比如 Deployment
ShouldResync: s.processor.shouldResync
Process: s.HandleDeltas
WatchErrorHandler: s.watchErrorHandler 默认为 null
cfg := &Config{}
s.controller = New(cfg)
s.cacheMutationDetector.Run()
font color=\"#ff00ff\
case addNotification: p.handler.OnAdd(notification.newObj)
case deleteNotification: p.handler.OnDelete(notification.oldObj)
p.wg.Start(listener.run) 处理 Notification
nextCh <- notification
p.wg.Start(listener.pop) 添加 Notification
s.processor.run()
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler)
r := NewReflector
这里是支持分页查询的,但是分页查询必须重新执行 ListAndWatch 函数
r.ShouldResync == nil || r.ShouldResync()
r.store.Resync()
启动一个 reSync() 的 goroutine
r.store.Add(event.Object)
watch.Added
r.store.Update(event.Object)
watch.Modified
r.store.Delete(event.Object)
watch.Deleted
不处理
watch.Bookmark
r.setLastSyncResourceVersion(newResourceVersion) 更新resourceVersion
r.watchHandler() 处理watch的资源
ListAndWatch(stopCh)
r.Run()
c.config.Queue.Pop() 从队列中弹出对象
PopProcessFunc(c.config.Process) 处理弹出的对象
c.processLoop
s.controller.Run(stopCh)
sharedIndexInformer.Run()
go informer.Run(stopCh)
informers.Start(stopCh)
分析
f.cond.Broadcast()
clientState.Update(obj)
clientState.Add(obj)
clientState.Delete(obj)
clientState 操作
handler.OnAdd(obj)
handler.OnDelete(obj)
handler 操作
processDeltas
更新本地缓存
client-go
0 条评论
回复 删除
下一页