kubernetes 的 client-go 源码解析
2022-08-03 09:18:57 0 举报
AI智能生成
kubernetes 的 client-go 源码解析,关注我,后续将有更多的源码解析
作者其他创作
大纲/内容
说明
主要分析 informer 的机制
例子
拿 deployment_controller_manager 来解析
源码位置 pkg/controller/deployment/deployment_controller_test.go:269
分析
f := newFixture(t)
<font color="#ff0000">d := newDeployment()</font>
metav1.TypeMeta{APIVersion: "apps/v1", Kind: "Deployment"}
Image: "foo/bar"
期望的测试结果
f.expectCreateRSAction(rs)
f.expectUpdateDeploymentStatusAction(d)
f.expectUpdateDeploymentStatusAction(d)
f.run(testutil.GetKey(d, t))
<font color="#ff0000">c, informers, err := f.newController()</font>
f.client = fake.NewSimpleClientset(f.objects...)
<font color="#ff0000">informers.NewSharedInformerFactory(f.client, controller.NoResyncPeriodFunc())</font>
defaultResync = 0
<font color="#ff0000">NewDeploymentController</font>
入参
informers.Apps().V1().Deployments()
informers.Apps().V1().ReplicaSets()
informers.Core().V1().Pods()
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
DeploymentController{}
client
eventRecorder
queue
dc.rsControl = controller.RealRSControl()
<font color="#ff0000">dInformer.Informer().AddEventHandler()</font>
dInformer.Informer()
<font color="#ff00ff">f.factory.InformerFor()</font>
informer, exists := f.informers[informerType]
resyncPeriod, exists := f.customResync[informerType]
informer = newFunc(f.client, resyncPeriod)
f.informers[informerType] = informer
<font color="#ff00ff">newFunc -> defaultInformer</font>
ListWatch 对象
ListFunc
WatchFunc
&appsv1.Deployment{} 关注的的对象
resyncPeriod 重新同步周期
indexers 缓存对象
NewSharedIndexInformer() 创建 Informer
objectType
processor
cacheMutationDetector
indexer
<font color="#ff0000">AddEventHandlerWithResyncPeriod()</font>
listener := newProcessListener()
<font color="#ff0000">s.processor.addListener(listener)</font>
ResourceEventHandlerFuncs
dc.addDeployment
dc.updateDeployment
dc.deleteDeployment
rsInformer.Informer().AddEventHandler()
podInformer.Informer().AddEventHandler()
<font color="#ff0000">dc.syncHandler = dc.syncDeployment</font>
<font color="#ff0000">dc.enqueueDeployment = dc.enqueue</font>
<font color="#ff00ff">dc.dLister = dInformer.Lister()</font>
dc.rsLister = rsInformer.Lister()
dc.podLister = podInformer.Lister()
dc.dListerSynced = dInformer.Informer().HasSynced
dc.rsListerSynced = rsInformer.Informer().HasSynced
dc.podListerSynced = podInformer.Informer().HasSynced
for _, d := range f.dLister {<br> informers.Apps().V1().Deployments().Informer().GetIndexer().Add(d)<br> }
for _, rs := range f.rsLister {<br> informers.Apps().V1().ReplicaSets().Informer().GetIndexer().Add(rs)<br> }
for _, pod := range f.podLister {<br> informers.Core().V1().Pods().Informer().GetIndexer().Add(pod)<br> }
<font color="#ff0000">informers.Start(stopCh)</font>
<font color="#ff0000">go informer.Run(stopCh)</font>
sharedIndexInformer.Run()
fifo := NewDeltaFIFOWithOptions
KnownObjects:s.indexer 缓存对象
EmitDeltaTypeReplaced: true
cfg := &Config{}
Queue: fifo
ListerWatcher: s.listerWatcher
ObjectType: s.objectType 关注的对象,比如 Deployment
<font color="#ff00ff">ShouldResync: s.processor.shouldResync</font>
<font color="#ff00ff">Process: s.HandleDeltas</font>
WatchErrorHandler: s.watchErrorHandler 默认为 null
s.controller = New(cfg)
s.cacheMutationDetector.Run()
<font color="#ff0000">s.processor.run()</font>
<font color="#00ff00">p.wg.Start(listener.run) 处理 Notification</font>
<font color="#ff00ff">case updateNotification:<br> p.handler.OnUpdate(notification.oldObj, notification.newObj)</font>
<font color="#ff00ff">case addNotification:<br> p.handler.OnAdd(notification.newObj)</font>
<font color="#ff00ff">case deleteNotification:<br> p.handler.OnDelete(notification.oldObj)</font>
<font color="#00ff00">p.wg.Start(listener.pop) 添加 Notification</font>
nextCh <- notification
notificationToAdd, ok := <-p.addCh
s.controller.Run(stopCh)
r := NewReflector
watchErrorHandler: WatchErrorHandler(DefaultWatchErrorHandler)
store: c.config.Queue,
<font color="#00ff00">r.Run()</font>
<font color="#ff00ff">ListAndWatch(stopCh)</font>
pager.List(context.Background(), options)
这里是支持分页查询的,但是分页查询必须重新执行 ListAndWatch 函数
listMetaInterface, err := meta.ListAccessor(list)<br>resourceVersion = listMetaInterface.GetResourceVersion() 获取 resourceVersion<br>
items, err := meta.ExtractList(list)<br>r.syncWith(items, resourceVersion) 全量同步<br>
<font color="#ff00ff">r.store.Replace(found, resourceVersion)</font>
启动一个 reSync() 的 goroutine
r.ShouldResync == nil || r.ShouldResync()
<font color="#ff00ff">r.store.Resync()</font>
w, err := r.listerWatcher.Watch(options) 开始watch资源
r.watchHandler() 处理watch的资源
watch.Added
<font color="#ff00ff">r.store.Add(event.Object)</font>
watch.Modified
<font color="#ff00ff">r.store.Update(event.Object)</font>
watch.Deleted
<font color="#ff00ff">r.store.Delete(event.Object)</font>
watch.Bookmark
不处理
r.setLastSyncResourceVersion(newResourceVersion) 更新resourceVersion
r.watchErrorHandler(r, err)
<font color="#00ff00">c.processLoop</font>
c.config.Queue.Pop() 从队列中弹出对象
<font color="#2196f3">PopProcessFunc(c.config.Process) 处理弹出的对象</font>
c.syncDeployment(context.TODO(), deploymentName)
queueActionLocked(actionType DeltaType, obj interface{})
f.cond.Broadcast()
<font color="#ff00ff">processDeltas</font>
clientState 操作
clientState.Update(obj)
clientState.Add(obj)
clientState.Delete(obj)
handler 操作
handler.OnUpdate(old, obj)
handler.OnAdd(obj)
handler.OnDelete(obj)
更新本地缓存
0 条评论
下一页