当前位置: 首页 > news >正文

kubelet源码阅读(二)——device plugin 的ListAndWatch过程

从kubelet运行到plugin注册过程

kubernetes-master/pkg/kubelet/kubelet.go
 1 func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
 2     // ...
 3     go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop)
 4     // ...
 5 }
 6 
 7 func (kl *Kubelet) updateRuntimeUp() {
 8     // ...
 9     kl.runtimeState.setRuntimeState(nil)
10     kl.runtimeState.setRuntimeHandlers(s.Handlers)
11     kl.runtimeState.setRuntimeFeatures(s.Features)
12     kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules)
13     kl.runtimeState.setRuntimeSync(kl.clock.Now())
14 }
15 
16 func (kl *Kubelet) initializeRuntimeDependentModules() {
17     // Adding Registration Callback function for DRA Plugin and Device Plugin
18     for name, handler := range kl.containerManager.GetPluginRegistrationHandlers() {
19         kl.pluginManager.AddHandler(name, handler)
20     }
21 
22     // Start the plugin manager
23     klog.V(4).InfoS("Starting plugin manager")
24     go kl.pluginManager.Run(kl.sourcesReady, wait.NeverStop)
25 }
26 
27 func (cm *containerManagerImpl) GetPluginRegistrationHandlers() map[string]cache.PluginHandler {
28     res := map[string]cache.PluginHandler{
29         pluginwatcherapi.DevicePlugin: cm.deviceManager.GetWatcherHandler(),
30     }
31 
32     if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) {
33         res[pluginwatcherapi.DRAPlugin] = cm.draManager.GetWatcherHandler()
34     }
35 
36     return res
37 }
38 
39 kubernetes-master/pkg/kubelet/cm/devicemanager/manager.go
40 // GetWatcherHandler returns the plugin handler
41 func (m *ManagerImpl) GetWatcherHandler() cache.PluginHandler {
42     return m.server
43 }

 

启动插件管理

 1 func (kl *Kubelet) initializeRuntimeDependentModules() {
 2     // Adding Registration Callback function for DRA Plugin and Device Plugin
 3     for name, handler := range kl.containerManager.GetPluginRegistrationHandlers() {
 4         kl.pluginManager.AddHandler(name, handler)
 5     }
 6 
 7     // Start the plugin manager
 8     klog.V(4).InfoS("Starting plugin manager")
 9     go kl.pluginManager.Run(kl.sourcesReady, wait.NeverStop)
10 }

详细代码

// 启动kubelet plugin manager
kubernetes-master/pkg/kubelet/pluginmanager/plugin_manager.go
func (pm *pluginManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {defer runtime.HandleCrash()if err := pm.desiredStateOfWorldPopulator.Start(stopCh); err != nil {klog.ErrorS(err, "The desired_state_of_world populator (plugin watcher) starts failed!")return}klog.V(2).InfoS("The desired_state_of_world populator (plugin watcher) starts")klog.InfoS("Starting Kubelet Plugin Manager")go pm.reconciler.Run(stopCh)metrics.Register(pm.actualStateOfWorld, pm.desiredStateOfWorld)<-stopChklog.InfoS("Shutting down Kubelet Plugin Manager")
}调谐device plugin,确保应该注册的插件已注册
kubernetes-master/pkg/kubelet/pluginmanager/reconciler/reconciler.go
func (rc *reconciler) reconcile() {// Unregisterations are triggered before registrations// 确保应该取消注册的插件已被取消注册。// Ensure plugins that should be unregistered are unregistered.for _, registeredPlugin := range rc.actualStateOfWorld.GetRegisteredPlugins() {unregisterPlugin := falseif !rc.desiredStateOfWorld.PluginExists(registeredPlugin.SocketPath) {unregisterPlugin = true} else {// We also need to unregister the plugins that exist in both actual state of world// and desired state of world cache, but the timestamps don't match.// Iterate through desired state of world plugins and see if there's any plugin// with the same socket path but different timestamp.for _, dswPlugin := range rc.desiredStateOfWorld.GetPluginsToRegister() {if dswPlugin.SocketPath == registeredPlugin.SocketPath && dswPlugin.UUID != registeredPlugin.UUID {klog.V(5).InfoS("An updated version of plugin has been found, unregistering the plugin first before reregistering", "plugin", registeredPlugin)unregisterPlugin = truebreak}}}if unregisterPlugin {klog.V(5).InfoS("Starting operationExecutor.UnregisterPlugin", "plugin", registeredPlugin)err := rc.operationExecutor.UnregisterPlugin(registeredPlugin, rc.actualStateOfWorld)if err != nil &&!goroutinemap.IsAlreadyExists(err) &&!exponentialbackoff.IsExponentialBackoff(err) {// Ignore goroutinemap.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.// Log all other errors.klog.ErrorS(err, "OperationExecutor.UnregisterPlugin failed", "plugin", registeredPlugin)}if err == nil {klog.V(1).InfoS("OperationExecutor.UnregisterPlugin started", "plugin", registeredPlugin)}}}// 确保应该注册的插件已注册// Ensure plugins that should be registered are registeredfor _, pluginToRegister := range rc.desiredStateOfWorld.GetPluginsToRegister() {if !rc.actualStateOfWorld.PluginExistsWithCorrectUUID(pluginToRegister) {klog.V(5).InfoS("Starting operationExecutor.RegisterPlugin", "plugin", pluginToRegister)err := rc.operationExecutor.RegisterPlugin(pluginToRegister.SocketPath, pluginToRegister.UUID, rc.getHandlers(), rc.actualStateOfWorld)if err != nil &&!goroutinemap.IsAlreadyExists(err) &&!exponentialbackoff.IsExponentialBackoff(err) {// Ignore goroutinemap.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.klog.ErrorS(err, "OperationExecutor.RegisterPlugin failed", "plugin", pluginToRegister)}if err == nil {klog.V(1).InfoS("OperationExecutor.RegisterPlugin started", "plugin", pluginToRegister)}}}
}kubernetes-master/pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go
func (s *server) RegisterPlugin(pluginName string, endpoint string, versions []string, pluginClientTimeout *time.Duration) error {klog.V(2).InfoS("Registering plugin at endpoint", "plugin", pluginName, "endpoint", endpoint)return s.connectClient(pluginName, endpoint)
}func (s *server) connectClient(name string, socketPath string) error {c := NewPluginClient(name, socketPath, s.chandler)s.registerClient(name, c)if err := c.Connect(); err != nil {s.deregisterClient(name)klog.ErrorS(err, "Failed to connect to new client", "resource", name)return err}klog.V(2).InfoS("Connected to new client", "resource", name)go func() {s.runClient(name, c)}()return nil
}func (s *server) runClient(name string, c Client) {c.Run()c = s.getClient(name)if c == nil {return}if err := s.disconnectClient(name, c); err != nil {klog.ErrorS(err, "Unable to disconnect client", "resource", name, "client", c)}
}

 执行ListAndWatch

 1 kubernetes-master/pkg/kubelet/cm/devicemanager/plugin/v1beta1/client.go
 2 // Run is for running the device plugin gRPC client.
 3 func (c *client) Run() {
 4     stream, err := c.client.ListAndWatch(context.Background(), &api.Empty{})
 5     if err != nil {
 6         klog.ErrorS(err, "ListAndWatch ended unexpectedly for device plugin", "resource", c.resource)
 7         return
 8     }
 9 
10     // 死循环获取device plugin的设备列表
11     for {
12         response, err := stream.Recv()
13         if err != nil {
14             klog.ErrorS(err, "ListAndWatch ended unexpectedly for device plugin", "resource", c.resource)
15             return
16         }
17         klog.V(2).InfoS("State pushed for device plugin", "resource", c.resource, "resourceCapacity", len(response.Devices))
18         c.handler.PluginListAndWatchReceiver(c.resource, response)
19     }
20 }

设备接收处理

会更新内存,并将设备与对应的容器分配信息存储到磁盘上。

 1 kubernetes-master/pkg/kubelet/cm/devicemanager/manager.go
 2 // PluginListAndWatchReceiver receives ListAndWatchResponse from a device plugin
 3 // and ensures that an upto date state (e.g. number of devices and device health)
 4 // is captured. Also, registered device and device to container allocation
 5 // information is checkpointed to the disk.
 6 func (m *ManagerImpl) PluginListAndWatchReceiver(resourceName string, resp *pluginapi.ListAndWatchResponse) {
 7     var devices []pluginapi.Device
 8     for _, d := range resp.Devices {
 9         devices = append(devices, *d)
10     }
11     m.genericDeviceUpdateCallback(resourceName, devices)
12 }
13 
14 func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices []pluginapi.Device) {
15     healthyCount := 0
16     m.mutex.Lock()
17     m.healthyDevices[resourceName] = sets.New[string]()
18     m.unhealthyDevices[resourceName] = sets.New[string]()
19     oldDevices := m.allDevices[resourceName]
20     podsToUpdate := sets.New[string]()
21     m.allDevices[resourceName] = make(map[string]pluginapi.Device)
22     for _, dev := range devices {
23 
24         if utilfeature.DefaultFeatureGate.Enabled(features.ResourceHealthStatus) {
25             // compare with old device's health and send update to the channel if needed
26             updatePodUIDFn := func(deviceID string) {
27                 podUID, _ := m.podDevices.getPodAndContainerForDevice(deviceID)
28                 if podUID != "" {
29                     podsToUpdate.Insert(podUID)
30                 }
31             }
32             if oldDev, ok := oldDevices[dev.ID]; ok {
33                 if oldDev.Health != dev.Health {
34                     updatePodUIDFn(dev.ID)
35                 }
36             } else {
37                 // if this is a new device, it might have existed before and disappeared for a while
38                 // but still be assigned to a Pod. In this case, we need to send an update to the channel
39                 updatePodUIDFn(dev.ID)
40             }
41         }
42 
43         m.allDevices[resourceName][dev.ID] = dev
44         if dev.Health == pluginapi.Healthy {
45             m.healthyDevices[resourceName].Insert(dev.ID)
46             healthyCount++
47         } else {
48             m.unhealthyDevices[resourceName].Insert(dev.ID)
49         }
50     }
51     m.mutex.Unlock()
52 
53     if utilfeature.DefaultFeatureGate.Enabled(features.ResourceHealthStatus) {
54         if len(podsToUpdate) > 0 {
55             select {
56             case m.update <- resourceupdates.Update{PodUIDs: podsToUpdate.UnsortedList()}:
57             default:
58                 klog.ErrorS(goerrors.New("device update channel is full"), "discard pods info", "podsToUpdate", podsToUpdate.UnsortedList())
59             }
60         }
61     }
62 
63     if err := m.writeCheckpoint(); err != nil {
64         klog.ErrorS(err, "Writing checkpoint encountered")
65     }
66     klog.V(2).InfoS("Processed device updates for resource", "resourceName", resourceName, "totalCount", len(devices), "healthyCount", healthyCount)
67 }
68 
69 // 检出设备对容器分配信息到磁盘
70 // Checkpoints device to container allocation information to disk.
71 func (m *ManagerImpl) writeCheckpoint() error {
72     m.mutex.Lock()
73     registeredDevs := make(map[string][]string)
74     for resource, devices := range m.healthyDevices {
75         registeredDevs[resource] = devices.UnsortedList()
76     }
77     data := checkpoint.New(m.podDevices.toCheckpointData(),
78         registeredDevs)
79     m.mutex.Unlock()
80     err := m.checkpointManager.CreateCheckpoint(kubeletDeviceManagerCheckpoint, data)
81     if err != nil {
82         err2 := fmt.Errorf("failed to write checkpoint file %q: %v", kubeletDeviceManagerCheckpoint, err)
83         klog.ErrorS(err, "Failed to write checkpoint file")
84         return err2
85     }
86     klog.V(4).InfoS("Checkpoint file written", "checkpoint", kubeletDeviceManagerCheckpoint)
87     return nil
88 }

 

http://www.hskmm.com/?act=detail&tid=17045

相关文章:

  • CyberLink ColorDirector Ultra 2026 14.0.5712.0 视频后期调色
  • CF 1053 Div.2
  • vi编辑器
  • 豆油
  • MQTT
  • 源码安装fail2ban
  • 类的继承与继承的覆盖
  • linux shell awk 中括号 方括号 分割 []
  • springboot配置文件关系及加载顺序
  • 绩效面谈中的优质提问(一)
  • 简单博弈
  • 从 “纸笔清单” 到全栈引擎:数据填报与类 Excel 控件如何重塑企业效率曲线 - 详解
  • 触摸IC原厂 VKD223EB是一款低电流1通道触控1按键触摸芯片 HBM静电大于5KV
  • 09_五大IO模型
  • wsl Ubuntu 使用cmake
  • 做题笔记总板
  • day 4
  • AI元人文思想体系:从哲学基础到价值原语博弈的微观机制
  • 做题笔记16
  • 条件判断语句
  • EXCEL 行列转换
  • 做题笔记6
  • 第17章 Day20-Day21 逆向爬虫之瑞数6
  • 基于多假设跟踪(MHT)算法的MATLAB实现
  • ROS2之消息接口
  • Linux grep cut tomcat logs
  • Vona ORM分表全攻略
  • 如何在预算与风险之间做选择 iOS 混淆(源码混淆 vs IPA 混淆)的成本-收益分析与实战决策框架
  • 【兰州大学主办|EI稳定检索】第二届信息光学与光电技术国际学术会议(CIOT 2025)
  • 深入解析:设计模式-状态模式详解