enqueue
将pending状态的job的PodGroup状态设置为inqueue,当会话关闭的时候会更新PodGroup状态。PodGroup状态变为inqueue后,controller会为其创建Pod。
pkg\scheduler\actions\enqueue\enqueue.go
1 func (enqueue *Action) Execute(ssn *framework.Session) { 2 klog.V(5).Infof("Enter Enqueue ...") 3 defer klog.V(5).Infof("Leaving Enqueue ...") 4 5 queues := util.NewPriorityQueue(ssn.QueueOrderFn) // 临时队列 6 queueSet := sets.NewString() // 临时集合 7 jobsMap := map[api.QueueID]*util.PriorityQueue{} // 临时字典:queueID与job队列对应关系 8 9 for _, job := range ssn.Jobs { 10 // 检查调度开始时间戳,如果是零则为其设置当前时间 11 if job.ScheduleStartTimestamp.IsZero() { 12 ssn.Jobs[job.UID].ScheduleStartTimestamp = metav1.Time{ 13 Time: time.Now(), 14 } 15 } 16 if queue, found := ssn.Queues[job.Queue]; !found { 17 klog.Errorf("Failed to find Queue <%s> for Job <%s/%s>", 18 job.Queue, job.Namespace, job.Name) 19 continue 20 } else if !queueSet.Has(string(queue.UID)) { 21 klog.V(5).Infof("Added Queue <%s> for Job <%s/%s>", 22 queue.Name, job.Namespace, job.Name) 23 24 queueSet.Insert(string(queue.UID)) 25 queues.Push(queue) 26 } 27 28 // 检查job的状态是不是pending,如果是则将其放到jobsMap的队列里 29 if job.IsPending() { 30 if _, found := jobsMap[job.Queue]; !found { 31 jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn) 32 } 33 klog.V(5).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue) 34 jobsMap[job.Queue].Push(job) 35 } 36 } 37 38 klog.V(3).Infof("Try to enqueue PodGroup to %d Queues", len(jobsMap)) 39 40 // 遍历PodGroup,将PodGroup状态改为inqueue,并在ssn.Jobs里记录job 41 for { 42 if queues.Empty() { 43 break 44 } 45 46 queue := queues.Pop().(*api.QueueInfo) 47 48 // skip the Queue that has no pending job 49 jobs, found := jobsMap[queue.UID] 50 if !found || jobs.Empty() { // 如果当前队列不存在或者job为空,则遍历下个队列 51 continue 52 } 53 job := jobs.Pop().(*api.JobInfo) 54 55 // 如果未设置最小资源或判定job可以入队,则对job进行处理: 56 // 1、对job执行插件的JobEnqueue处理 57 // 2、将PodGroup的状态改为inqueue 58 // 3、将job记录到ssn上 59 if job.PodGroup.Spec.MinResources == nil || ssn.JobEnqueueable(job) { 60 ssn.JobEnqueued(job) 61 job.PodGroup.Status.Phase = scheduling.PodGroupInqueue 62 ssn.Jobs[job.UID] = job 63 } 64 65 // Added Queue back until no job in Queue. 66 queues.Push(queue) 67 } 68 }
调度周期完毕后,关闭会话时会更信PodGroup
1 func closeSession(ssn *Session) { 2 ju := NewJobUpdater(ssn) 3 ju.UpdateAll() // 更新所有 4 5 updateQueueStatus(ssn) 6 7 ssn.Jobs = nil 8 ssn.Nodes = nil 9 ssn.RevocableNodes = nil 10 ssn.plugins = nil 11 ssn.eventHandlers = nil 12 ssn.jobOrderFns = nil 13 ssn.queueOrderFns = nil 14 ssn.clusterOrderFns = nil 15 ssn.NodeList = nil 16 ssn.TotalResource = nil 17 18 klog.V(3).Infof("Close Session %v", ssn.UID) 19 } 20 21 // updateJob update specified job 22 func (ju *JobUpdater) updateJob(index int) { 23 job := ju.jobQueue[index] 24 ssn := ju.ssn 25 26 job.PodGroup.Status = jobStatus(ssn, job) 27 oldStatus, found := ssn.PodGroupOldState.Status[job.UID] 28 updatePGStatus := !found || isPodGroupStatusUpdated(job.PodGroup.Status, oldStatus) 29 updatePGAnnotations := ju.isJobAllocatedHyperNodeChanged(job) 30 if _, err := ssn.cache.UpdateJobStatus(job, updatePGStatus, updatePGAnnotations); err != nil { 31 klog.Errorf("Failed to update job <%s/%s>: %v", 32 job.Namespace, job.Name, err) 33 } 34 } 35 36 // UpdateJobStatus update the status of job and its tasks. 37 func (sc *SchedulerCache) UpdateJobStatus(job *schedulingapi.JobInfo, updatePGStatus, updatePGAnnotations bool) (*schedulingapi.JobInfo, error) { 38 if updatePGStatus || updatePGAnnotations { 39 if updatePGAnnotations { 40 sc.updateJobAnnotations(job) 41 } 42 pg, err := sc.StatusUpdater.UpdatePodGroup(job.PodGroup) 43 if err != nil { 44 return nil, err 45 } 46 job.PodGroup = pg 47 } 48 sc.RecordJobStatusEvent(job, updatePGStatus) 49 50 return job, nil 51 }