从kubelet运行到plugin注册过程
kubernetes-master/pkg/kubelet/kubelet.go
1 func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) { 2 // ... 3 go wait.Until(kl.updateRuntimeUp, 5*time.Second, wait.NeverStop) 4 // ... 5 } 6 7 func (kl *Kubelet) updateRuntimeUp() { 8 // ... 9 kl.runtimeState.setRuntimeState(nil) 10 kl.runtimeState.setRuntimeHandlers(s.Handlers) 11 kl.runtimeState.setRuntimeFeatures(s.Features) 12 kl.oneTimeInitializer.Do(kl.initializeRuntimeDependentModules) 13 kl.runtimeState.setRuntimeSync(kl.clock.Now()) 14 } 15 16 func (kl *Kubelet) initializeRuntimeDependentModules() { 17 // Adding Registration Callback function for DRA Plugin and Device Plugin 18 for name, handler := range kl.containerManager.GetPluginRegistrationHandlers() { 19 kl.pluginManager.AddHandler(name, handler) 20 } 21 22 // Start the plugin manager 23 klog.V(4).InfoS("Starting plugin manager") 24 go kl.pluginManager.Run(kl.sourcesReady, wait.NeverStop) 25 } 26 27 func (cm *containerManagerImpl) GetPluginRegistrationHandlers() map[string]cache.PluginHandler { 28 res := map[string]cache.PluginHandler{ 29 pluginwatcherapi.DevicePlugin: cm.deviceManager.GetWatcherHandler(), 30 } 31 32 if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) { 33 res[pluginwatcherapi.DRAPlugin] = cm.draManager.GetWatcherHandler() 34 } 35 36 return res 37 } 38 39 kubernetes-master/pkg/kubelet/cm/devicemanager/manager.go 40 // GetWatcherHandler returns the plugin handler 41 func (m *ManagerImpl) GetWatcherHandler() cache.PluginHandler { 42 return m.server 43 }
启动插件管理
1 func (kl *Kubelet) initializeRuntimeDependentModules() { 2 // Adding Registration Callback function for DRA Plugin and Device Plugin 3 for name, handler := range kl.containerManager.GetPluginRegistrationHandlers() { 4 kl.pluginManager.AddHandler(name, handler) 5 } 6 7 // Start the plugin manager 8 klog.V(4).InfoS("Starting plugin manager") 9 go kl.pluginManager.Run(kl.sourcesReady, wait.NeverStop) 10 }
详细代码
// 启动kubelet plugin manager kubernetes-master/pkg/kubelet/pluginmanager/plugin_manager.go func (pm *pluginManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {defer runtime.HandleCrash()if err := pm.desiredStateOfWorldPopulator.Start(stopCh); err != nil {klog.ErrorS(err, "The desired_state_of_world populator (plugin watcher) starts failed!")return}klog.V(2).InfoS("The desired_state_of_world populator (plugin watcher) starts")klog.InfoS("Starting Kubelet Plugin Manager")go pm.reconciler.Run(stopCh)metrics.Register(pm.actualStateOfWorld, pm.desiredStateOfWorld)<-stopChklog.InfoS("Shutting down Kubelet Plugin Manager") }调谐device plugin,确保应该注册的插件已注册 kubernetes-master/pkg/kubelet/pluginmanager/reconciler/reconciler.go func (rc *reconciler) reconcile() {// Unregisterations are triggered before registrations// 确保应该取消注册的插件已被取消注册。// Ensure plugins that should be unregistered are unregistered.for _, registeredPlugin := range rc.actualStateOfWorld.GetRegisteredPlugins() {unregisterPlugin := falseif !rc.desiredStateOfWorld.PluginExists(registeredPlugin.SocketPath) {unregisterPlugin = true} else {// We also need to unregister the plugins that exist in both actual state of world// and desired state of world cache, but the timestamps don't match.// Iterate through desired state of world plugins and see if there's any plugin// with the same socket path but different timestamp.for _, dswPlugin := range rc.desiredStateOfWorld.GetPluginsToRegister() {if dswPlugin.SocketPath == registeredPlugin.SocketPath && dswPlugin.UUID != registeredPlugin.UUID {klog.V(5).InfoS("An updated version of plugin has been found, unregistering the plugin first before reregistering", "plugin", registeredPlugin)unregisterPlugin = truebreak}}}if unregisterPlugin {klog.V(5).InfoS("Starting operationExecutor.UnregisterPlugin", "plugin", registeredPlugin)err := rc.operationExecutor.UnregisterPlugin(registeredPlugin, rc.actualStateOfWorld)if err != nil &&!goroutinemap.IsAlreadyExists(err) &&!exponentialbackoff.IsExponentialBackoff(err) {// Ignore goroutinemap.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.// Log all other errors.klog.ErrorS(err, "OperationExecutor.UnregisterPlugin failed", "plugin", registeredPlugin)}if err == nil {klog.V(1).InfoS("OperationExecutor.UnregisterPlugin started", "plugin", registeredPlugin)}}}// 确保应该注册的插件已注册// Ensure plugins that should be registered are registeredfor _, pluginToRegister := range rc.desiredStateOfWorld.GetPluginsToRegister() {if !rc.actualStateOfWorld.PluginExistsWithCorrectUUID(pluginToRegister) {klog.V(5).InfoS("Starting operationExecutor.RegisterPlugin", "plugin", pluginToRegister)err := rc.operationExecutor.RegisterPlugin(pluginToRegister.SocketPath, pluginToRegister.UUID, rc.getHandlers(), rc.actualStateOfWorld)if err != nil &&!goroutinemap.IsAlreadyExists(err) &&!exponentialbackoff.IsExponentialBackoff(err) {// Ignore goroutinemap.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.klog.ErrorS(err, "OperationExecutor.RegisterPlugin failed", "plugin", pluginToRegister)}if err == nil {klog.V(1).InfoS("OperationExecutor.RegisterPlugin started", "plugin", pluginToRegister)}}} }kubernetes-master/pkg/kubelet/cm/devicemanager/plugin/v1beta1/handler.go func (s *server) RegisterPlugin(pluginName string, endpoint string, versions []string, pluginClientTimeout *time.Duration) error {klog.V(2).InfoS("Registering plugin at endpoint", "plugin", pluginName, "endpoint", endpoint)return s.connectClient(pluginName, endpoint) }func (s *server) connectClient(name string, socketPath string) error {c := NewPluginClient(name, socketPath, s.chandler)s.registerClient(name, c)if err := c.Connect(); err != nil {s.deregisterClient(name)klog.ErrorS(err, "Failed to connect to new client", "resource", name)return err}klog.V(2).InfoS("Connected to new client", "resource", name)go func() {s.runClient(name, c)}()return nil }func (s *server) runClient(name string, c Client) {c.Run()c = s.getClient(name)if c == nil {return}if err := s.disconnectClient(name, c); err != nil {klog.ErrorS(err, "Unable to disconnect client", "resource", name, "client", c)} }
执行ListAndWatch
1 kubernetes-master/pkg/kubelet/cm/devicemanager/plugin/v1beta1/client.go 2 // Run is for running the device plugin gRPC client. 3 func (c *client) Run() { 4 stream, err := c.client.ListAndWatch(context.Background(), &api.Empty{}) 5 if err != nil { 6 klog.ErrorS(err, "ListAndWatch ended unexpectedly for device plugin", "resource", c.resource) 7 return 8 } 9 10 // 死循环获取device plugin的设备列表 11 for { 12 response, err := stream.Recv() 13 if err != nil { 14 klog.ErrorS(err, "ListAndWatch ended unexpectedly for device plugin", "resource", c.resource) 15 return 16 } 17 klog.V(2).InfoS("State pushed for device plugin", "resource", c.resource, "resourceCapacity", len(response.Devices)) 18 c.handler.PluginListAndWatchReceiver(c.resource, response) 19 } 20 }
设备接收处理
会更新内存,并将设备与对应的容器分配信息存储到磁盘上。
1 kubernetes-master/pkg/kubelet/cm/devicemanager/manager.go 2 // PluginListAndWatchReceiver receives ListAndWatchResponse from a device plugin 3 // and ensures that an upto date state (e.g. number of devices and device health) 4 // is captured. Also, registered device and device to container allocation 5 // information is checkpointed to the disk. 6 func (m *ManagerImpl) PluginListAndWatchReceiver(resourceName string, resp *pluginapi.ListAndWatchResponse) { 7 var devices []pluginapi.Device 8 for _, d := range resp.Devices { 9 devices = append(devices, *d) 10 } 11 m.genericDeviceUpdateCallback(resourceName, devices) 12 } 13 14 func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices []pluginapi.Device) { 15 healthyCount := 0 16 m.mutex.Lock() 17 m.healthyDevices[resourceName] = sets.New[string]() 18 m.unhealthyDevices[resourceName] = sets.New[string]() 19 oldDevices := m.allDevices[resourceName] 20 podsToUpdate := sets.New[string]() 21 m.allDevices[resourceName] = make(map[string]pluginapi.Device) 22 for _, dev := range devices { 23 24 if utilfeature.DefaultFeatureGate.Enabled(features.ResourceHealthStatus) { 25 // compare with old device's health and send update to the channel if needed 26 updatePodUIDFn := func(deviceID string) { 27 podUID, _ := m.podDevices.getPodAndContainerForDevice(deviceID) 28 if podUID != "" { 29 podsToUpdate.Insert(podUID) 30 } 31 } 32 if oldDev, ok := oldDevices[dev.ID]; ok { 33 if oldDev.Health != dev.Health { 34 updatePodUIDFn(dev.ID) 35 } 36 } else { 37 // if this is a new device, it might have existed before and disappeared for a while 38 // but still be assigned to a Pod. In this case, we need to send an update to the channel 39 updatePodUIDFn(dev.ID) 40 } 41 } 42 43 m.allDevices[resourceName][dev.ID] = dev 44 if dev.Health == pluginapi.Healthy { 45 m.healthyDevices[resourceName].Insert(dev.ID) 46 healthyCount++ 47 } else { 48 m.unhealthyDevices[resourceName].Insert(dev.ID) 49 } 50 } 51 m.mutex.Unlock() 52 53 if utilfeature.DefaultFeatureGate.Enabled(features.ResourceHealthStatus) { 54 if len(podsToUpdate) > 0 { 55 select { 56 case m.update <- resourceupdates.Update{PodUIDs: podsToUpdate.UnsortedList()}: 57 default: 58 klog.ErrorS(goerrors.New("device update channel is full"), "discard pods info", "podsToUpdate", podsToUpdate.UnsortedList()) 59 } 60 } 61 } 62 63 if err := m.writeCheckpoint(); err != nil { 64 klog.ErrorS(err, "Writing checkpoint encountered") 65 } 66 klog.V(2).InfoS("Processed device updates for resource", "resourceName", resourceName, "totalCount", len(devices), "healthyCount", healthyCount) 67 } 68 69 // 检出设备对容器分配信息到磁盘 70 // Checkpoints device to container allocation information to disk. 71 func (m *ManagerImpl) writeCheckpoint() error { 72 m.mutex.Lock() 73 registeredDevs := make(map[string][]string) 74 for resource, devices := range m.healthyDevices { 75 registeredDevs[resource] = devices.UnsortedList() 76 } 77 data := checkpoint.New(m.podDevices.toCheckpointData(), 78 registeredDevs) 79 m.mutex.Unlock() 80 err := m.checkpointManager.CreateCheckpoint(kubeletDeviceManagerCheckpoint, data) 81 if err != nil { 82 err2 := fmt.Errorf("failed to write checkpoint file %q: %v", kubeletDeviceManagerCheckpoint, err) 83 klog.ErrorS(err, "Failed to write checkpoint file") 84 return err2 85 } 86 klog.V(4).InfoS("Checkpoint file written", "checkpoint", kubeletDeviceManagerCheckpoint) 87 return nil 88 }