当前位置: 首页 > news >正文

K8s源码分析(二)-K8s调度队列介绍

本文首发在个人博客上,欢迎来踩!
本次分析参考的K8s版本是

文章目录

  • 调度队列简介
  • 调度队列源代码分析
    • 队列初始化
    • QueuedPodInfo元素介绍
    • ActiveQ源代码介绍
    • UnschedulableQ源代码介绍
    • **BackoffQ**源代码介绍
    • 队列弹出待调度的Pod
    • 队列增加新的待调度的Pod
    • pod调度失败返回队列的处理
    • flushBackoffQCompleted
    • flushUnschedulablePodsLeftover
  • 调度队列总结

调度队列简介

这里是官方对于K8s中调度队列的介绍,很值得一看:Scheduling queue in kube-scheduler。整体的架构如下图所示。

请添加图片描述

简单来说K8s中的调度队列主要有3种:

  • ActiveQ(heap结构):在每个调度周期开始时都会从这里取出一个Pod尝试调度。一开始提交的所有没有指定.spec.nodeName的Pod都会发送到这里,也会接收来自unschedulableQ和BackoffQ刷新来的pod。默认的排序规则是按照优先级进行排列,高优先级的Pod在前面。
  • UnschedulableQ(Map结构):存储调度失败的Pod,以等待资源更新、其他相关Pod调度成功等事件,从而将其的Pod其进行重调度。
  • BackoffQ(heap结构):用来暂时退避的队列,默认的排列规则是按退避时间的长度进行排序,需要退避的时间短的Pod在前面。为了防止Pod频繁的重调度,每个Pod都会记录自己的重调度次数,退避时间随着每次失败的调度尝试呈指数增长,直到达到最大值,例如尝试失败 3 次的 Pod 的目标退避超时设置为 curTime + 2s^3 (8s)。注意有两种情况下Pod会进入到BackoffQ队列中:
    • unscheduleableQ会定时对其中的所有pod进行重调度,那么就需要计算各个pod是否退避了足够的时间,如果没有就放入到BackoffQ中再退避一段时间。
    • 如果一个Pod调度失败时,正好这时又异步地发生了资源变更事件(p.moveRequestCycle **>=** podSchedulingCycle )(schedulingCycle 是当前调度的周期,ActiveQ队列每pop一个pod,就加1,moveRequestCycle是事件发生时schedulingCycle 的值),那么就不会放入UnschedulableQ中,而是会直接放入到BackoffQ中。

调度队列机制有两个在后台运行的定期刷新 go协程,负责将 pod 移动到活动队列,后续也将详细介绍相关代码:

  • **flushUnschedulablePodsLeftover:**每 30 秒运行一次,将 Pod 从UnschedulableQ中移动,以允许未由任何事件移动的不可调度的 Pod 再次重试。
  • **flushBackoffQCompleted:**每1秒运行一次,将BackoffQ中已经回避了足够久的Pod移动到ActiveQ队列中

移动请求(move request)会触发一个事件,该事件负责将 Pod 从UnschedulableQ移动到ActiveQ或BackoffQ。集群中许多事件可以触发移动请求的发生,包括了 Pod、节点、服务、PV、PVC、存储类和 CSI 节点的更改。例如当某些pod被调度时,UnschedulableQ中与其具有亲和性要求而导致之前无法调度的pod就会被移动出去,或者当某个新node加入时,原本因为资源不够导致无法调度的Pod也会被移动出去。

调度队列源代码分析

队列初始化

Scheduler中的调度队列SchedulingQueueinternalqueue.SchedulingQueue类型,该类型的实现在pkg/scheduler/internal/queue/scheduling_queue.go:92,如下。

// SchedulingQueue is an interface for a queue to store pods waiting to be scheduled.
// The interface follows a pattern similar to cache.FIFO and cache.Heap and
// makes it easy to use those data structures as a SchedulingQueue.
type SchedulingQueue interface {framework.PodNominatorAdd(pod *v1.Pod) error// Activate moves the given pods to activeQ iff they're in unschedulablePods or backoffQ.// The passed-in pods are originally compiled from plugins that want to activate Pods,// by injecting the pods through a reserved CycleState struct (PodsToActivate).Activate(pods map[string]*v1.Pod)// AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue.// The podSchedulingCycle represents the current scheduling cycle number which can be// returned by calling SchedulingCycle().AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error// SchedulingCycle returns the current number of scheduling cycle which is// cached by scheduling queue. Normally, incrementing this number whenever// a pod is popped (e.g. called Pop()) is enough.SchedulingCycle() int64// Pop removes the head of the queue and returns it. It blocks if the// queue is empty and waits until a new item is added to the queue.Pop() (*framework.QueuedPodInfo, error)Update(oldPod, newPod *v1.Pod) errorDelete(pod *v1.Pod) errorMoveAllToActiveOrBackoffQueue(event framework.ClusterEvent, preCheck PreEnqueueCheck)AssignedPodAdded(pod *v1.Pod)AssignedPodUpdated(pod *v1.Pod)PendingPods() ([]*v1.Pod, string)// Close closes the SchedulingQueue so that the goroutine which is// waiting to pop items can exit gracefully.Close()// Run starts the goroutines managing the queue.Run()
}

上述代码定义了其需要的对队列中的元素添加、删除、更新、获取、运行等方法。而其标准实现PriorityQueue

pkg/scheduler/internal/queue/scheduling_queue.go:145 中,首先查看其需要的变量:

// PriorityQueue implements a scheduling queue.
// The head of PriorityQueue is the highest priority pending pod. This structure
// has two sub queues and a additional data structure, namely: activeQ,
// backoffQ and unschedulablePods.
//   - activeQ holds pods that are being considered for scheduling.
//   - backoffQ holds pods that moved from unschedulablePods and will move to
//     activeQ when their backoff periods complete.
//   - unschedulablePods holds pods that were already attempted for scheduling and
//     are currently determined to be unschedulable.
type PriorityQueue struct {*nominatorstop  chan struct{}clock clock.Clock// pod initial backoff duration.podInitialBackoffDuration time.Duration// pod maximum backoff duration.podMaxBackoffDuration time.Duration// the maximum time a pod can stay in the unschedulablePods.podMaxInUnschedulablePodsDuration time.Durationcond sync.Cond// activeQ is heap structure that scheduler actively looks at to find pods to// schedule. Head of heap is the highest priority pod.activeQ *heap.Heap// podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff// are popped from this heap before the scheduler looks at activeQpodBackoffQ *heap.Heap// unschedulablePods holds pods that have been tried and determined unschedulable.unschedulablePods *UnschedulablePods// schedulingCycle represents sequence number of scheduling cycle and is incremented// when a pod is popped.schedulingCycle int64// moveRequestCycle caches the sequence number of scheduling cycle when we// received a move request. Unschedulable pods in and before this scheduling// cycle will be put back to activeQueue if we were trying to schedule them// when we received move request.moveRequestCycle int64clusterEventMap map[framework.ClusterEvent]sets.String// preEnqueuePluginMap is keyed with profile name, valued with registered preEnqueue plugins.preEnqueuePluginMap map[string][]framework.PreEnqueuePlugin// closed indicates that the queue is closed.// It is mainly used to let Pop() exit its control loop while waiting for an item.closed boolnsLister listersv1.NamespaceListermetricsRecorder metrics.MetricAsyncRecorder// pluginMetricsSamplePercent is the percentage of plugin metrics to be sampled.pluginMetricsSamplePercent int
}

pkg/scheduler/internal/queue/scheduling_queue.go:291 中给出了生成了该队列的初始化方法

// NewPriorityQueue creates a PriorityQueue object.
func NewPriorityQueue(lessFn framework.LessFunc,informerFactory informers.SharedInformerFactory,opts ...Option,
) *PriorityQueue {options := defaultPriorityQueueOptionsif options.podLister == nil {options.podLister = informerFactory.Core().V1().Pods().Lister()}for _, opt := range opts {opt(&options)}comp := func(podInfo1, podInfo2 interface{}) bool {pInfo1 := podInfo1.(*framework.QueuedPodInfo)pInfo2 := podInfo2.(*framework.QueuedPodInfo)return lessFn(pInfo1, pInfo2)}pq := &PriorityQueue{nominator:                         newPodNominator(options.podLister),clock:                             options.clock,stop:                              make(chan struct{}),podInitialBackoffDuration:         options.podInitialBackoffDuration,podMaxBackoffDuration:             options.podMaxBackoffDuration,podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration,activeQ:                           heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),unschedulablePods:                 newUnschedulablePods(metrics.NewUnschedulablePodsRecorder(), metrics.NewGatedPodsRecorder()),moveRequestCycle:                  -1,clusterEventMap:                   options.clusterEventMap,preEnqueuePluginMap:               options.preEnqueuePluginMap,metricsRecorder:                   options.metricsRecorder,pluginMetricsSamplePercent:        options.pluginMetricsSamplePercent,}pq.cond.L = &pq.lockpq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())pq.nsLister = informerFactory.Core().V1().Namespaces().Lister()return pq
}

可以看到其包含了许多我们上面介绍的概念,包括activeQunschedulablePodspodBackoffQschedulingCyclemoveRequestCycle

QueuedPodInfo元素介绍

这里也多次出现了QueuedPodInfo这个关键的数据结构,它是Pod中的基础元素,在此进行介绍,其定义在pkg/scheduler/framework/types.go:98 中,包括了PodInfo、添加时间、尝试次数等

// QueuedPodInfo is a Pod wrapper with additional information related to
// the pod's status in the scheduling queue, such as the timestamp when
// it's added to the queue.
type QueuedPodInfo struct {*PodInfo// The time pod added to the scheduling queue.Timestamp time.Time// Number of schedule attempts before successfully scheduled.// It's used to record the # attempts metric.Attempts int// The time when the pod is added to the queue for the first time. The pod may be added// back to the queue multiple times before it's successfully scheduled.// It shouldn't be updated once initialized. It's used to record the e2e scheduling// latency for a pod.InitialAttemptTimestamp time.Time// If a Pod failed in a scheduling cycle, record the plugin names it failed by.UnschedulablePlugins sets.String// Whether the Pod is scheduling gated (by PreEnqueuePlugins) or not.Gated bool
}

PodInfo的定义在pkg/scheduler/framework/types.go:131

// PodInfo is a wrapper to a Pod with additional pre-computed information to
// accelerate processing. This information is typically immutable (e.g., pre-processed
// inter-pod affinity selectors).
type PodInfo struct {Pod                        *v1.PodRequiredAffinityTerms      []AffinityTermRequiredAntiAffinityTerms  []AffinityTermPreferredAffinityTerms     []WeightedAffinityTermPreferredAntiAffinityTerms []WeightedAffinityTerm
}

Pod的定义在staging/src/k8s.io/api/core/v1/types.go:4202

// Pod is a collection of containers that can run on a host. This resource is created
// by clients and scheduled onto hosts.
type Pod struct {metav1.TypeMeta `json:",inline"`// Standard object's metadata.// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#metadata// +optionalmetav1.ObjectMeta `json:"metadata,omitempty" protobuf:"bytes,1,opt,name=metadata"`// Specification of the desired behavior of the pod.// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status// +optionalSpec PodSpec `json:"spec,omitempty" protobuf:"bytes,2,opt,name=spec"`// Most recently observed status of the pod.// This data may not be up to date.// Populated by the system.// Read-only.// More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status// +optionalStatus PodStatus `json:"status,omitempty" protobuf:"bytes,3,opt,name=status"`
}

ActiveQ源代码介绍

从初始化代码中可以看到ActiveQ是一个heap,其相关定义在pkg/scheduler/internal/heap/heap.go

// Heap is a producer/consumer queue that implements a heap data structure.
// It can be used to implement priority queues and similar data structures.
type Heap struct {// data stores objects and has a queue that keeps their ordering according// to the heap invariant.data *data// metricRecorder updates the counter when elements of a heap get added or// removed, and it does nothing if it's nilmetricRecorder metrics.MetricRecorder
}
// data is an internal struct that implements the standard heap interface
// and keeps the data stored in the heap.
type data struct {// items is a map from key of the objects to the objects and their index.// We depend on the property that items in the map are in the queue and vice versa.items map[string]*heapItem// queue implements a heap data structure and keeps the order of elements// according to the heap invariant. The queue keeps the keys of objects stored// in "items".queue []string// keyFunc is used to make the key used for queued item insertion and retrieval, and// should be deterministic.keyFunc KeyFunc// lessFunc is used to compare two objects in the heap.lessFunc lessFunc
}

可以看到他是用queue实现了一个heap。

ActiveQ的默认排序代码在pkg/scheduler/framework/plugins/queuesort/priority_sort.go:42中,即按优先级进行排序,如果优先级相同就提交时间的早晚进行排序。

// Less is the function used by the activeQ heap algorithm to sort pods.
// It sorts pods based on their priority. When priorities are equal, it uses
// PodQueueInfo.timestamp.
func (pl *PrioritySort) Less(pInfo1, pInfo2 *framework.QueuedPodInfo) bool {p1 := corev1helpers.PodPriority(pInfo1.Pod)p2 := corev1helpers.PodPriority(pInfo2.Pod)return (p1 > p2) || (p1 == p2 && pInfo1.Timestamp.Before(pInfo2.Timestamp))
}

UnschedulableQ源代码介绍

UnschedulableQ进行初始化的具体代码在pkg/scheduler/internal/queue/scheduling_queue.go:998

// newUnschedulablePods initializes a new object of UnschedulablePods.
func newUnschedulablePods(unschedulableRecorder, gatedRecorder metrics.MetricRecorder) *UnschedulablePods {return &UnschedulablePods{podInfoMap:            make(map[string]*framework.QueuedPodInfo),keyFunc:               util.GetPodFullName,unschedulableRecorder: unschedulableRecorder,gatedRecorder:         gatedRecorder,}
}

其具体的定义代码在pkg/scheduler/internal/queue/scheduling_queue.go:939 ,

// UnschedulablePods holds pods that cannot be scheduled. This data structure
// is used to implement unschedulablePods.
type UnschedulablePods struct {// podInfoMap is a map key by a pod's full-name and the value is a pointer to the QueuedPodInfo.podInfoMap map[string]*framework.QueuedPodInfokeyFunc    func(*v1.Pod) string// unschedulableRecorder/gatedRecorder updates the counter when elements of an unschedulablePodsMap// get added or removed, and it does nothing if it's nil.unschedulableRecorder, gatedRecorder metrics.MetricRecorder
}

可以看到他没有进行heap的包装,而是直接采用Map结构进行保存。

BackoffQ源代码介绍

BackoffQ也是一个heap,与ActiveQ不同的一点在于排序函数不同,其排序函数的定义在pkg/scheduler/internal/queue/scheduling_queue.go:888

func (p *PriorityQueue) podsCompareBackoffCompleted(podInfo1, podInfo2 interface{}) bool {pInfo1 := podInfo1.(*framework.QueuedPodInfo)pInfo2 := podInfo2.(*framework.QueuedPodInfo)bo1 := p.getBackoffTime(pInfo1)bo2 := p.getBackoffTime(pInfo2)return bo1.Before(bo2)
}

getBackoffTime的定义在pkg/scheduler/internal/queue/scheduling_queue.go:911中,即计算完成避让的时间

// getBackoffTime returns the time that podInfo completes backoff
func (p *PriorityQueue) getBackoffTime(podInfo *framework.QueuedPodInfo) time.Time {duration := p.calculateBackoffDuration(podInfo)backoffTime := podInfo.Timestamp.Add(duration)return backoffTime
}

可以看到队列排序时会将完成避让最早的pod放在前面。

然后再看其是如何计算避让时间的,在pkg/scheduler/internal/queue/scheduling_queue.go

// calculateBackoffDuration is a helper function for calculating the backoffDuration
// based on the number of attempts the pod has made.
func (p *PriorityQueue) calculateBackoffDuration(podInfo *framework.QueuedPodInfo) time.Duration {duration := p.podInitialBackoffDurationfor i := 1; i < podInfo.Attempts; i++ {// Use subtraction instead of addition or multiplication to avoid overflow.if duration > p.podMaxBackoffDuration-duration {return p.podMaxBackoffDuration}duration += duration}return duration
}

其计算可以理解为初次为p.podInitialBackoffDuration,每次需要的避让时间都是前一次的两倍,如果计算得到的避让时间大于p.podMaxBackoffDuration/2 ,就将避让时间设置为p.podMaxBackoffDuration

队列弹出待调度的Pod

其代码在pkg/scheduler/internal/queue/scheduling_queue.go:593

// Pop removes the head of the active queue and returns it. It blocks if the
// activeQ is empty and waits until a new item is added to the queue. It
// increments scheduling cycle when a pod is popped.
func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) {p.lock.Lock()defer p.lock.Unlock()for p.activeQ.Len() == 0 {// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.// When Close() is called, the p.closed is set and the condition is broadcast,// which causes this loop to continue and return from the Pop().if p.closed {return nil, fmt.Errorf(queueClosed)}p.cond.Wait()}obj, err := p.activeQ.Pop()if err != nil {return nil, err}pInfo := obj.(*framework.QueuedPodInfo)pInfo.Attempts++p.schedulingCycle++return pInfo, nil
}

可以看到如果activeQ中没有需要调度的Pod了那么就会使用p.cond.Wait来进行等待,否则就冲activeQ中Pop一个元素QueuedPodInfo,同时这个QueuedPodInfo 的Attempts会+1,整个队列中的schedulingCycle也会增加。

队列增加新的待调度的Pod

其代码在pkg/scheduler/internal/queue/scheduling_queue.go:398

// Add adds a pod to the active queue. It should be called only when a new pod
// is added so there is no chance the pod is already in active/unschedulable/backoff queues
func (p *PriorityQueue) Add(pod *v1.Pod) error {p.lock.Lock()defer p.lock.Unlock()pInfo := p.newQueuedPodInfo(pod)gated := pInfo.Gatedif added, err := p.addToActiveQ(pInfo); !added {return err}if p.unschedulablePods.get(pod) != nil {klog.ErrorS(nil, "Error: pod is already in the unschedulable queue", "pod", klog.KObj(pod))p.unschedulablePods.delete(pod, gated)}// Delete pod from backoffQ if it is backing offif err := p.podBackoffQ.Delete(pInfo); err == nil {klog.ErrorS(nil, "Error: pod is already in the podBackoff queue", "pod", klog.KObj(pod))}klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", PodAdd, "queue", activeQName)metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc()p.addNominatedPodUnlocked(pInfo.PodInfo, nil)p.cond.Broadcast()return nil
}

主要是将要加入的pod转化为QueuedPodInfo类型,然后添加到activeQ队列中,还需要检查其他队列中是否有这个pod,如果有就删除,同时做一些日志相关记录,然后还会调用p.cond.Broadcast()来解除上述提到的p.cond.Wait 的等待。

pod调度失败返回队列的处理

当Pod调度失败后,会调用来AddUnschedulableIfNotPresent函数来进行处理,其代码位置在pkg/scheduler/internal/queue/scheduling_queue.go 中。

// AddUnschedulableIfNotPresent inserts a pod that cannot be scheduled into
// the queue, unless it is already in the queue. Normally, PriorityQueue puts
// unschedulable pods in `unschedulablePods`. But if there has been a recent move
// request, then the pod is put in `podBackoffQ`.
func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) error {p.lock.Lock()defer p.lock.Unlock()pod := pInfo.Podif p.unschedulablePods.get(pod) != nil {return fmt.Errorf("Pod %v is already present in unschedulable queue", klog.KObj(pod))}if _, exists, _ := p.activeQ.Get(pInfo); exists {return fmt.Errorf("Pod %v is already present in the active queue", klog.KObj(pod))}if _, exists, _ := p.podBackoffQ.Get(pInfo); exists {return fmt.Errorf("Pod %v is already present in the backoff queue", klog.KObj(pod))}// Refresh the timestamp since the pod is re-added.pInfo.Timestamp = p.clock.Now()// If a move request has been received, move it to the BackoffQ, otherwise move// it to unschedulablePods.for plugin := range pInfo.UnschedulablePlugins {metrics.UnschedulableReason(plugin, pInfo.Pod.Spec.SchedulerName).Inc()}if p.moveRequestCycle >= podSchedulingCycle {if err := p.podBackoffQ.Add(pInfo); err != nil {return fmt.Errorf("error adding pod %v to the backoff queue: %v", klog.KObj(pod), err)}klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", ScheduleAttemptFailure, "queue", backoffQName)metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", ScheduleAttemptFailure).Inc()} else {p.unschedulablePods.addOrUpdate(pInfo)klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", ScheduleAttemptFailure, "queue", unschedulablePods)metrics.SchedulerQueueIncomingPods.WithLabelValues("unschedulable", ScheduleAttemptFailure).Inc()}p.addNominatedPodUnlocked(pInfo.PodInfo, nil)return nil
}

这里首先检查了其他队列中是否含有该pod,如果有就返回错误,然后比较moveRequestCyclepodSchedulingCycle ,如果p.moveRequestCycle >= podSchedulingCycle 那就说明在刚刚调度这个pod的时候集群发生了变化,可能现在可以成功调度这个pod了,将其转入backoffQ中,不然就正常加入unschedulableQ中。

flushBackoffQCompleted

在队列运行时会初始化两个go协程,来分别不停检查backoffQunschedulableQ,以及时将相关的Pod移出。代码在pkg/scheduler/internal/queue/scheduling_queue.go:333

// Run starts the goroutine to pump from podBackoffQ to activeQ
func (p *PriorityQueue) Run() {go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)go wait.Until(p.flushUnschedulablePodsLeftover, 30*time.Second, p.stop)
}

对于flushBackoffQCompleted即是每1s运行一次,直到接收到p.stop信息。对flushBackoffQCompleted 函数的具体定义在pkg/scheduler/internal/queue/scheduling_queue.go:537中,如下

// flushBackoffQCompleted Moves all pods from backoffQ which have completed backoff in to activeQ
func (p *PriorityQueue) flushBackoffQCompleted() {p.lock.Lock()defer p.lock.Unlock()activated := falsefor {rawPodInfo := p.podBackoffQ.Peek()if rawPodInfo == nil {break}pInfo := rawPodInfo.(*framework.QueuedPodInfo)pod := pInfo.Podif p.isPodBackingoff(pInfo) {break}_, err := p.podBackoffQ.Pop()if err != nil {klog.ErrorS(err, "Unable to pop pod from backoff queue despite backoff completion", "pod", klog.KObj(pod))break}if err := p.activeQ.Add(pInfo); err != nil {klog.ErrorS(err, "Error adding pod to the active queue", "pod", klog.KObj(pInfo.Pod))} else {klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pod), "event", BackoffComplete, "queue", activeQName)metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc()activated = true}}if activated {p.cond.Broadcast()}
}

其主要内容就是从backOffQ的首个元素开始查看,检查器是否已经过了避让时间,如果过了就将其放入到activeQ队列中,直到首个元素没有达到避让时间或者队列为空。

flushUnschedulablePodsLeftover

flushUnschedulablePodsLeftover每30s运行一次,这部分的代码在pkg/scheduler/internal/queue/scheduling_queue.go:572中,如下

// flushUnschedulablePodsLeftover moves pods which stay in unschedulablePods
// longer than podMaxInUnschedulablePodsDuration to backoffQ or activeQ.
func (p *PriorityQueue) flushUnschedulablePodsLeftover() {p.lock.Lock()defer p.lock.Unlock()var podsToMove []*framework.QueuedPodInfocurrentTime := p.clock.Now()for _, pInfo := range p.unschedulablePods.podInfoMap {lastScheduleTime := pInfo.Timestampif currentTime.Sub(lastScheduleTime) > p.podMaxInUnschedulablePodsDuration {podsToMove = append(podsToMove, pInfo)}}if len(podsToMove) > 0 {p.movePodsToActiveOrBackoffQueue(podsToMove, UnschedulableTimeout)}
}

可以看到其主要作用是遍历所有的pod,如果其在unschedulableQ中呆的时间如果超过了最大的p.podMaxInUnschedulablePodsDuration时间,就会将其移出去,至于是移动到activeQ中还是移动到backoffQ中,取决于movePodsToActiveOrBackoffQueue函数,在pkg/scheduler/internal/queue/scheduling_queue.go:771中,如下

// NOTE: this function assumes lock has been acquired in caller
func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.QueuedPodInfo, event framework.ClusterEvent) {activated := falsefor _, pInfo := range podInfoList {// If the event doesn't help making the Pod schedulable, continue.// Note: we don't run the check if pInfo.UnschedulablePlugins is nil, which denotes// either there is some abnormal error, or scheduling the pod failed by plugins other than PreFilter, Filter and Permit.// In that case, it's desired to move it anyways.if len(pInfo.UnschedulablePlugins) != 0 && !p.podMatchesEvent(pInfo, event) {continue}pod := pInfo.Podif p.isPodBackingoff(pInfo) {if err := p.podBackoffQ.Add(pInfo); err != nil {klog.ErrorS(err, "Error adding pod to the backoff queue", "pod", klog.KObj(pod))} else {klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", backoffQName)metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event.Label).Inc()p.unschedulablePods.delete(pod, pInfo.Gated)}} else {gated := pInfo.Gatedif added, _ := p.addToActiveQ(pInfo); added {klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQName)activated = truemetrics.SchedulerQueueIncomingPods.WithLabelValues("active", event.Label).Inc()p.unschedulablePods.delete(pod, gated)}}}p.moveRequestCycle = p.schedulingCycleif activated {p.cond.Broadcast()}
}

注意这个函数不仅仅是在flushUnschedulablePodsLeftover中被调用,还会在处理其他移动请求时触发,只不过这里的移动请求是UnschedulableTimeout ,判断到底是如何移动也很容易从代码中看出,如果已经到达了避让时间,就加入到activeQ中,如果没有就加入到backoffQ中,注意到如果有移动进activeQ中,也是需要执行p.cond.Broadcast(),同时注意到这里更新了moveRequestCycleschedulingCycle,这也是其统一更新moveRequestCycle 的地方。

调度队列总结

考虑到调度队列的细节,我们可以用下图来对其进行归纳回顾。

请添加图片描述

相关文章:

K8s源码分析(二)-K8s调度队列介绍

本文首发在个人博客上&#xff0c;欢迎来踩&#xff01; 本次分析参考的K8s版本是 文章目录 调度队列简介调度队列源代码分析队列初始化QueuedPodInfo元素介绍ActiveQ源代码介绍UnschedulableQ源代码介绍**BackoffQ**源代码介绍队列弹出待调度的Pod队列增加新的待调度的Podpod调…...

OpenGL ES 面试高频知识点(二)

说说纹理常用的采样方式? 最邻近点采样(GL_NEAREST)和双线性采样(GL_LINEAR)。 GL_NEAREST 采样是 OpenGL 默认的纹理采样方式,OpenGL 会选择中心点最接近纹理坐标的那个像素,纹理放大的时候会有锯齿感或者颗粒感。 **GL_LINEAR 采样会基于纹理坐标附近的纹理像素,计…...

2024第十六届“中国电机工程学会杯”数学建模A题B题思路分析

文章目录 1 赛题思路2 比赛日期和时间3 竞赛信息4 建模常见问题类型4.1 分类问题4.2 优化问题4.3 预测问题4.4 评价问题 5 建模资料 1 赛题思路 (赛题出来以后第一时间在CSDN分享) https://blog.csdn.net/dc_sinor?typeblog 2 比赛日期和时间 报名截止时间&#xff1a;2024…...

面向对象的三大特性:封装、继承、多态

一、封装 封装是面向对象的核心思想。是以类为载体&#xff0c;将对象的属性和行为封装起来&#xff0c;对外隐藏其实现细节。 封装保证了类内部数据结构的完整性&#xff0c;使得外部&#xff08;使用该类的用户&#xff09;不能轻易地直接操作此数据结构&#xff0c;只能执…...

目标检测YOLO实战应用案例100讲-基于深度学习的交通场景多尺度目标检测算法研究与应用(中)

目录 3.4 实验结果与分析 深度融合注意力跨尺度复合空洞残差交通目标检测算法...

前端GET请求下载后端返回数据流文件,并且处理window.open方法跳转白屏方法

平时常用导出都是用window.open方法 点击跳转连接&#xff1a;使用 window.open 下载 const downError 地址?&参数${参数|| }; const downError Url/xxx/xxx?&orgId${orgId || };window.open(downError, "_self");//调用window.open方法导出 而使用…...

SD321放大器3V输入电流电压保护二极管25C电源电流

Sd 321运算放大器可以在单电源或双电源电压下工作&#xff0c; 可以使用最坏情况下的非反相单位增益连接来适应。如 具有真微分输入&#xff0c;并且保持在线性模式&#xff0c;输入共模电压 果放大器必须驱动较大的负载电容&#xff0c;则应使用较大的闭 为0。Vpc-这种放大器可…...

geoserver SQL注入、Think PHP5 SQL注入、spring命令注入

文章目录 一、geoserver SQL注入CVE-2023-25157二、Think PHP5 SQL注入三、Spring Cloud Function SpEL表达式命令注入&#xff08;CVE-2022-22963&#xff09; 一、geoserver SQL注入CVE-2023-25157 介绍&#xff1a;GeoServer是一个开源的地理信息系统&#xff08;GIS&#…...

scrapy的入门

今天我们先学习一下scrapy的入门,Scrapy是一个快速的高层次的网页爬取和网页抓取框架&#xff0c;用于爬取网站并从页面中提取结构化的数据。 1. scrapy的概念和流程 1.1 scrapy的概念 我们先来了解一下scrapy的概念,什么是scrapy: Scrapy是一个Python编写的开源网络爬虫框架…...

大数据Scala教程从入门到精通第七篇:Scala在IDEA中编写Hello World

一&#xff1a;Scala在IDEA中编写Hello World 想让我们的idea支持scala的编写&#xff0c;需要安装一个插件。...

设计模式之数据访问对象模式

在Java编程的浩瀚星海中&#xff0c;有一个模式低调却强大&#xff0c;它像是一位默默无闻的超级英雄&#xff0c;支撑起无数应用的数据脊梁——那就是数据访问对象&#xff08;DAO, Data Access Object&#xff09;模式&#xff01;想象一下&#xff0c;如果你能像操纵魔法一样…...

Spring aop切面编程

Spring aop切面编程 如何使用利用AuditAction创建切入点 如何使用 Aspect // 1. 创建一个类&#xff0c;用Aspect注解标记它&#xff0c;表明这是一个切面类。 Component public class LoggingAspect {// 2. 定义切点&#xff1a;在通知方法上&#xff0c;使用切点表达式来指定…...

如何更好地使用Kafka? - 事先预防篇

要确保Kafka在使用过程中的稳定性&#xff0c;需要从kafka在业务中的使用周期进行依次保障。主要可以分为&#xff1a;事先预防&#xff08;通过规范的使用、开发&#xff0c;预防问题产生&#xff09;、运行时监控&#xff08;保障集群稳定&#xff0c;出问题能及时发现&#…...

如何解决 IPA 打包过程中的 “Invalid Bundle Structure“ 错误

哈喽&#xff0c;大家好呀&#xff0c;淼淼又来和大家见面啦&#xff0c;咱们行业内的应该都知道&#xff0c;在开发 iOS 应用时&#xff0c;将应用打包成 IPA 文件是常见的步骤之一。最近很多小伙伴们说在打包过程中&#xff0c;有时会遇到 "Invalid Bundle Structure&qu…...

Vuex:Vue.js 的状态管理库

一、Vuex 简介 Vuex 是 Vue.js 的状态管理模式和库。它采用集中式存储管理应用的所有组件的状态&#xff0c;并以相应的规则保证状态以一种可预测的方式发生变化。Vuex 的出现解决了组件间共享状态的问题&#xff0c;使得状态管理变得简单、可预测和可维护。 二、Vuex 核心概…...

【简单介绍下Sass】

&#x1f3a5;博主&#xff1a;程序员不想YY啊 &#x1f4ab;CSDN优质创作者&#xff0c;CSDN实力新星&#xff0c;CSDN博客专家 &#x1f917;点赞&#x1f388;收藏⭐再看&#x1f4ab;养成习惯 ✨希望本文对您有所裨益&#xff0c;如有不足之处&#xff0c;欢迎在评论区提出…...

IM 是什么?

在当今数字化的时代&#xff0c;即时通讯&#xff08;IM&#xff09;已经渗透到人们的日常生活和企业的工作环境中。IM技术的快速i发展为人们提供了一种高效、便捷的沟通方式&#xff0c;不仅推动了社会的信息化进程&#xff0c;也提升了企业的协同效率和竞争力。 作为企业级I…...

俄罗斯方块的代码实现

文章目录 首先是头文件的引入部分接下来是一些预处理指令接下来定义了两个结构体&#xff1a;接下来是全局变量g_hConsoleOutput&#xff0c;用于存储控制台输出句柄。之后是一系列函数的声明最后是main函数源码 首先是头文件的引入部分 包括stdio.h、string.h、stdlib.h、tim…...

出海企业哪种组网方案更省事?

对于出海企业而言&#xff0c;建立跨地区的数据传输和协同工作至关重要&#xff0c;以提升运营效率。因此&#xff0c;网络构建变得迫在眉睫。通过构建企业组网&#xff0c;企业能够加强与海外分支、客户和合作伙伴之间的联系&#xff0c;加速海外业务的发展。 然而&#xff0c…...

triton编译学习

一 流程 Triton-MLIR: 从DSL到PTX - 知乎 (zhihu.com)https://zhuanlan.zhihu.com/p/671434808Superjomns blog | OpenAI/Triton MLIR 迁移工作简介https://superjom...

源码知识付费系统,在线教学平台需要优化什么?

在线教育关于广大的关注者而言属于快捷度非常高的传达途径&#xff0c;尤其是白日没有过多时间的上班族或学习繁忙的学生&#xff0c;均能够通过可靠的在线教育完结自己的目的。如此巨大的市场潜力使得以在线教育为主的公司数量呈现出直线上升的趋势&#xff0c;很多的在线教育…...

后端常用技能:解决java项目前后端传输数据中文出现乱码、问号问题

0. 问题背景 最近做一个解析数据的小工具&#xff0c;本地运行时都正常&#xff0c;发布到服务器上后在导出文件数据时发现中文全部变成了问号&#xff0c;特此记录下问题解决的思路和过程 1. 环境 java 1.8 springboot 2.6.13 额外引入了fastjson&#xff0c;commons-csv等…...

SpringBoot中使用MongoDB

目录 搭建实体类 基本的增删改查操作 分页查询 使用MongoTemplate实现复杂的功能 引入依赖 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-mongodb</artifactId> </dependency> 在ap…...

【TS】入门

创建项目 vscode自动编译ts 生成配置文件 tsc --init 然后发现终端也改变了&#xff1a;...

Apache ECharts

Apache ECharts介绍&#xff1a; Apache ECharts 是一款基于 Javascript 的数据可视化图表库&#xff0c;提供直观&#xff0c;生动&#xff0c;可交互&#xff0c;可个性化定制的数据可视化图表。 官网地址&#xff1a;https://echarts.apache.org/zh/index.html Apache ECh…...

超详细的胎教级Stable Diffusion使用教程(四)

这套课程分为五节课&#xff0c;会系统性的介绍sd的全部功能和实操案例&#xff0c;让你打下坚实牢靠的基础 一、为什么要学Stable Diffusion&#xff0c;它究竟有多强大&#xff1f; 二、三分钟教你装好Stable Diffusion 三、小白快速上手Stable Diffusion 四、Stable dif…...

串口属性中的BM延时计时器问题

如果使用程序修改则需要修改注册表对应位置如下 第一个示例&#xff08;217&#xff09; 第二个示例&#xff08;219&#xff09; 需要注意的事情是修改前必须点查看串口名称&#xff08;例如上图是com5&#xff09; 程序修改&#xff1a; 有没有办法以编程方式更改USB <…...

PyQt6--Python桌面开发(8.QPlainTextEdit纯文本控件)

QPlainTextEdit纯文本控件...

Java | Leetcode Java题解之第83题删除排序链表中的重复元素

题目&#xff1a; 题解&#xff1a; class Solution {public ListNode deleteDuplicates(ListNode head) {if (head null) {return head;}ListNode cur head;while (cur.next ! null) {if (cur.val cur.next.val) {cur.next cur.next.next;} else {cur cur.next;}}return…...

重生奇迹mu再生宝石怎么用有什么用

重生奇迹mu再生宝石有2个用处&#xff1a; 1、在玛雅哥布林处给380装备加PVP属性4追4以上的380级装备,守护宝石一颗,再生宝石一颗,成功得到PVP装备,失败宝石消失,装备无变化&#xff1b; 2、给非套装点强化属性用法跟祝福,灵魂,生命一样直接往装备上敲,成功得到随机强化属性一…...