当前位置: 首页 > news >正文

volcano源码阅读——action/enqueue

enqueue
将pending状态的job的PodGroup状态设置为inqueue,当会话关闭的时候会更新PodGroup状态。PodGroup状态变为inqueue后,controller会为其创建Pod。
 
pkg\scheduler\actions\enqueue\enqueue.go
 1 func (enqueue *Action) Execute(ssn *framework.Session) {
 2     klog.V(5).Infof("Enter Enqueue ...")
 3     defer klog.V(5).Infof("Leaving Enqueue ...")
 4 
 5     queues := util.NewPriorityQueue(ssn.QueueOrderFn) // 临时队列
 6     queueSet := sets.NewString()                      // 临时集合
 7     jobsMap := map[api.QueueID]*util.PriorityQueue{}  // 临时字典:queueID与job队列对应关系
 8 
 9     for _, job := range ssn.Jobs {
10         // 检查调度开始时间戳,如果是零则为其设置当前时间
11         if job.ScheduleStartTimestamp.IsZero() {
12             ssn.Jobs[job.UID].ScheduleStartTimestamp = metav1.Time{
13                 Time: time.Now(),
14             }
15         }
16         if queue, found := ssn.Queues[job.Queue]; !found {
17             klog.Errorf("Failed to find Queue <%s> for Job <%s/%s>",
18                 job.Queue, job.Namespace, job.Name)
19             continue
20         } else if !queueSet.Has(string(queue.UID)) {
21             klog.V(5).Infof("Added Queue <%s> for Job <%s/%s>",
22                 queue.Name, job.Namespace, job.Name)
23 
24             queueSet.Insert(string(queue.UID))
25             queues.Push(queue)
26         }
27 
28         // 检查job的状态是不是pending,如果是则将其放到jobsMap的队列里
29         if job.IsPending() {
30             if _, found := jobsMap[job.Queue]; !found {
31                 jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
32             }
33             klog.V(5).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue)
34             jobsMap[job.Queue].Push(job)
35         }
36     }
37 
38     klog.V(3).Infof("Try to enqueue PodGroup to %d Queues", len(jobsMap))
39 
40     // 遍历PodGroup,将PodGroup状态改为inqueue,并在ssn.Jobs里记录job
41     for {
42         if queues.Empty() {
43             break
44         }
45 
46         queue := queues.Pop().(*api.QueueInfo)
47 
48         // skip the Queue that has no pending job
49         jobs, found := jobsMap[queue.UID]
50         if !found || jobs.Empty() { // 如果当前队列不存在或者job为空,则遍历下个队列
51             continue
52         }
53         job := jobs.Pop().(*api.JobInfo)
54 
55         // 如果未设置最小资源或判定job可以入队,则对job进行处理:
56         // 1、对job执行插件的JobEnqueue处理
57         // 2、将PodGroup的状态改为inqueue
58         // 3、将job记录到ssn上
59         if job.PodGroup.Spec.MinResources == nil || ssn.JobEnqueueable(job) {
60             ssn.JobEnqueued(job)
61             job.PodGroup.Status.Phase = scheduling.PodGroupInqueue
62             ssn.Jobs[job.UID] = job
63         }
64 
65         // Added Queue back until no job in Queue.
66         queues.Push(queue)
67     }
68 }
 
调度周期完毕后,关闭会话时会更信PodGroup
 1 func closeSession(ssn *Session) {
 2     ju := NewJobUpdater(ssn)
 3     ju.UpdateAll() // 更新所有
 4 
 5     updateQueueStatus(ssn)
 6 
 7     ssn.Jobs = nil
 8     ssn.Nodes = nil
 9     ssn.RevocableNodes = nil
10     ssn.plugins = nil
11     ssn.eventHandlers = nil
12     ssn.jobOrderFns = nil
13     ssn.queueOrderFns = nil
14     ssn.clusterOrderFns = nil
15     ssn.NodeList = nil
16     ssn.TotalResource = nil
17 
18     klog.V(3).Infof("Close Session %v", ssn.UID)
19 }
20 
21 // updateJob update specified job
22 func (ju *JobUpdater) updateJob(index int) {
23     job := ju.jobQueue[index]
24     ssn := ju.ssn
25 
26     job.PodGroup.Status = jobStatus(ssn, job)
27     oldStatus, found := ssn.PodGroupOldState.Status[job.UID]
28     updatePGStatus := !found || isPodGroupStatusUpdated(job.PodGroup.Status, oldStatus)
29     updatePGAnnotations := ju.isJobAllocatedHyperNodeChanged(job)
30     if _, err := ssn.cache.UpdateJobStatus(job, updatePGStatus, updatePGAnnotations); err != nil {
31         klog.Errorf("Failed to update job <%s/%s>: %v",
32             job.Namespace, job.Name, err)
33     }
34 }
35 
36 // UpdateJobStatus update the status of job and its tasks.
37 func (sc *SchedulerCache) UpdateJobStatus(job *schedulingapi.JobInfo, updatePGStatus, updatePGAnnotations bool) (*schedulingapi.JobInfo, error) {
38     if updatePGStatus || updatePGAnnotations {
39         if updatePGAnnotations {
40             sc.updateJobAnnotations(job)
41         }
42         pg, err := sc.StatusUpdater.UpdatePodGroup(job.PodGroup)
43         if err != nil {
44             return nil, err
45         }
46         job.PodGroup = pg
47     }
48     sc.RecordJobStatusEvent(job, updatePGStatus)
49 
50     return job, nil
51 }

 

 

http://www.hskmm.com/?act=detail&tid=32854

相关文章:

  • 2025年工业大吊扇厂家权威推荐榜:大型厂房通风降温设备源头企业综合实力与客户口碑深度解析
  • 【左扬精讲】SRE 别慌!我用 故障预测与诊断,性能评估与优化,资源分配与规划 讲概率与贝叶斯算法的实战应用,都是咱运维人能懂的话(含代码)
  • 农经权报表生成小程序介绍
  • 【2025-10-16】移居香港
  • 学校社团招新的题目(莫队+树状数组统计区间逆序对个数)(蒟蒻被薄纱QAQ)
  • 基于MATLAB的齿轮故障检测
  • Linux 中检测gz压缩文件是否损坏
  • 2025年信息流代运营服务商权威推荐榜:专业投放策略与高转化效果深度解析,助力企业精准营销
  • 2025 年 PP 管厂家最新推荐榜:全面甄选优质 pp 风管、PP 喷淋塔等产品厂家,助力实验室场景精准选型
  • 基于MATLAB的无线传感器网络(WSN)仿真程序实现
  • NMAP扫描
  • MyEMS:衔接 “双控” 政策与企业实践的开源能源管理利器
  • 权限维持-Windows权限维持
  • LVGL
  • 2025 电动轮椅厂家最新推荐榜:深度解析智能轻便 / 长续航 / 高安全国产优质品牌核心优势
  • 2025年信息流代运营服务商权威推荐榜单:专业投放策略与高效转化服务口碑之选
  • 一些框架
  • 1017
  • 2025 建筑工程施工总包公司最新推荐榜:聚焦质量管控与新锐势力,优质企业权威甄选
  • 2025 广州人力资源/派遣/外包/劳务外包/人事代理/推荐榜:精典人才创新 5 星领跑,适配招聘 / 测评 / 培训全场景企业需求
  • 反事实推理防御AI黑客攻击技术解析
  • 2025 年选矿行业 2 号油厂家最新推荐排行榜:环保型 / 新型 / JQ202/101/QX/BK201/323 起泡剂等产品权威筛选,助力企业选对优质供应商
  • 2025 年探伤仪厂商最新推荐榜单:涡流 / 超声波 / 管材 / 焊缝 / 无损探伤仪优质企业权威盘点
  • 微调 - Lora
  • 2025 年罗茨风机厂家最新推荐排行榜权威发布!深度解析各品牌优势助企业精准选型UNTW无泄漏/BRW水冷式罗茨风机厂家推荐
  • GoogleNet
  • 2025磨床主轴定制/磨床主轴非标定制/国产/进口/内圆/外圆/无心/平面/来图定制磨床电主轴厂家推荐榜:技术与口碑双优之选
  • 【树莓派】安装PostgreSQL
  • 2025年轮胎厂家权威推荐榜:舒适轮胎,耐磨轮胎,高性能轮胎与静音轮胎全系列选购指南
  • 史诗级警报:ASP.NET Core 被曝 CVSS 9.9 分漏洞,几乎所有.NET 版本无一幸免!