源碼面前,了無密碼:Kuberentes Scheduler 源碼剖析
本篇文章介紹一下 Kubernetes 的默認調度器 kube-scheduler 的源碼實現。kubernetes 代碼版本:v1.18.4-rc.0。
0. 入口
入口函數在路徑 kubernetes/cmd/kube-scheduler/scheduler.go#main()
,如下
func main() { rand.Seed(time.Now().UnixNano()) command := app.NewSchedulerCommand() // TODO: once we switch everything over to Cobra commands, we can go back to calling // utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the // normalize func and add the go flag set by hand. pflag.CommandLine.SetNormalizeFunc(cliflag.WordSepNormalizeFunc) // utilflag.InitFlags() logs.InitLogs() defer logs.FlushLogs() if err := command.Execute(); err != nil { os.Exit(1) } }
核心邏輯就是:1. 創建一個 SchedulerCommand(第 4 行);2. 接收參數並執行(第 14 行)。我們先看一下創建 SchedulerCommand 的邏輯。
// NewSchedulerCommand creates a *cobra.Command object with default parameters and registryOptions func NewSchedulerCommand(registryOptions ...Option) *cobra.Command { opts, err := options.NewOptions() if err != nil { klog.Fatalf("unable to initialize command options: %v", err) } cmd := &cobra.Command{ Use: "kube-scheduler", Long: `The Kubernetes scheduler is a policy-rich, topology-aware, workload-specific function that significantly impacts availability, performance, and capacity. The scheduler needs to take into account individual and collective resource requirements, quality of service requirements, hardware/software/policy constraints, affinity and anti-affinity specifications, data locality, inter-workload interference, deadlines, and so on. Workload-specific requirements will be exposed through the API as necessary. See [scheduling](https://kubernetes.io/docs/concepts/scheduling/) for more information about scheduling and the kube-scheduler component.`, Run: func(cmd *cobra.Command, args []string) { if err := runCommand(cmd, args, opts, registryOptions...); err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) } }, } fs := cmd.Flags() ... return cmd }
首先我們可以看到 NewSchedulerCommand 接收一個不定參數,registryOptions。從名字我們可以看出來首先這個參數是作用於一個 Registry 的,這個 Registry 實際上就是用來管理 kuberentes 中的 plugin 的。
// Registry is a collection of all available plugins. The framework uses a // registry to enable and initialize configured plugins. // All plugins must be in the registry before initializing the framework. type Registry map[string]PluginFactory
而 registryOptions 中的 option 其實是一種函數傳參的方式的使用。option 傳參的方式最早由 Rob Pike 提出來的,簡單來說就是將可選的 option 參數封裝成多個函數傳給目標函數,然後在目標函數內部通過調用 option 函數的方式來初始化。後面我們看到 RegistryOptions 初始化的部分再來介紹。對於 option 這種方式感興趣的同學可以參考我之前的一篇文章: http://legendtkl.com/2016/11/05/code-scalability/
其次是 cmd,通過 cobra.Command 構建出來的一個 CLI 處理工具,對於命令行的輸入通過第 18 行的匿名函數來處理,匿名函數內部會調用函數 runCommand 來啓動 scheduler 進程。去掉一些不重要的代碼邏輯,runCommand 主要做的事情就是創建 scheduler 參數,然後通過 Run 函數啓動 scheduler 進程。
// runCommand runs the scheduler. func runCommand(cmd *cobra.Command, args []string, opts *options.Options, registryOptions ...Option) error { ... // 創建 scheduler 參數 c, err := opts.Config() if err != nil { return err } // Get the completed config // 參數補充 cc := c.Complete() // Configz registration. if cz, err := configz.New("componentconfig"); err == nil { cz.Set(cc.ComponentConfig) } else { return fmt.Errorf("unable to register configz: %s", err) } ctx, cancel := context.WithCancel(context.Background()) defer cancel() return Run(ctx, cc, registryOptions...) }
Run 函數的主要邏輯如下:
- 初始化 Registry,第 6 ~ 11 行就是 option 這種函數傳參的處理邏輯。
- 創建 scheduler 實例
- 其他初始化操作,包括 EventBroadcast、健康檢測、metric 等相關邏輯
- 啓動 Pod Informer 來監聽 Pod
- 運行調度器(分沒有啓動 leader 選舉,但是對應的方法都是 sched.Run 方法)
// Run executes the scheduler based on the given configuration. It only returns on error or when context is done. func Run(ctx context.Context, cc schedulerserverconfig.CompletedConfig, outOfTreeRegistryOptions ...Option) error { // To help debugging, immediately log version klog.V(1).Infof("Starting Kubernetes Scheduler version %+v", version.Get()) outOfTreeRegistry := make(framework.Registry) for _, option := range outOfTreeRegistryOptions { if err := option(outOfTreeRegistry); err != nil { return err } } recorderFactory := getRecorderFactory(&cc) // Create the scheduler. sched, err := scheduler.New(cc.Client, cc.InformerFactory, cc.PodInformer, recorderFactory, ctx.Done(), scheduler.WithProfiles(cc.ComponentConfig.Profiles...), scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource), scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption), scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore), scheduler.WithBindTimeoutSeconds(cc.ComponentConfig.BindTimeoutSeconds), scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry), scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds), scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds), scheduler.WithExtenders(cc.ComponentConfig.Extenders...), ) if err != nil { return err } // Prepare the event broadcaster. if cc.Broadcaster != nil && cc.EventClient != nil { cc.Broadcaster.StartRecordingToSink(ctx.Done()) } if cc.CoreBroadcaster != nil && cc.CoreEventClient != nil { cc.CoreBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: cc.CoreEventClient.Events("")}) } // Setup healthz checks. var checks []healthz.HealthChecker if cc.ComponentConfig.LeaderElection.LeaderElect { checks = append(checks, cc.LeaderElection.WatchDog) } // Start up the healthz server. if cc.InsecureServing != nil { separateMetrics := cc.InsecureMetricsServing != nil handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, separateMetrics, checks...), nil, nil) if err := cc.InsecureServing.Serve(handler, 0, ctx.Done()); err != nil { return fmt.Errorf("failed to start healthz server: %v", err) } } if cc.InsecureMetricsServing != nil { handler := buildHandlerChain(newMetricsHandler(&cc.ComponentConfig), nil, nil) if err := cc.InsecureMetricsServing.Serve(handler, 0, ctx.Done()); err != nil { return fmt.Errorf("failed to start metrics server: %v", err) } } if cc.SecureServing != nil { handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, false, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer) // TODO: handle stoppedCh returned by c.SecureServing.Serve if _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil { // fail early for secure handlers, removing the old error loop from above return fmt.Errorf("failed to start secure server: %v", err) } } // Start all informers. go cc.PodInformer.Informer().Run(ctx.Done()) cc.InformerFactory.Start(ctx.Done()) // Wait for all caches to sync before scheduling. cc.InformerFactory.WaitForCacheSync(ctx.Done()) // If leader election is enabled, runCommand via LeaderElector until done and exit. if cc.LeaderElection != nil { cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{ OnStartedLeading: sched.Run, OnStoppedLeading: func() { klog.Fatalf("leaderelection lost") }, } leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection) if err != nil { return fmt.Errorf("couldn't create leader elector: %v", err) } leaderElector.Run(ctx) return fmt.Errorf("lost lease") } // Leader election is disabled, so runCommand inline until done. sched.Run(ctx) return fmt.Errorf("finished without leader elect") }
scheduler 實例
首先我們看一下 scheduler 的定義,路徑爲 pkg/scheduler/scheduler.go
。
// Scheduler 監聽未調度的 Pod,爲其尋找適合的 Node 節點,並寫回到 api server type Scheduler struct { // 調度器 Cache SchedulerCache internalcache.Cache Algorithm core.ScheduleAlgorithm // PodConditionUpdater is used only in case of scheduling errors. If we succeed // with scheduling, PodScheduled condition will be updated in apiserver in /bind // handler so that binding and setting PodCondition it is atomic. podConditionUpdater podConditionUpdater // 在搶佔情況下用來驅逐 pod,更新搶佔者的 'NominatedNode' 字段 podPreemptor podPreemptor // 返回下一個需要調度的 Pod,如果沒有需要調度的 pod,則該方法將 block 住。這裏不使用 channel 數據結構是因爲調度過程可能會花費一定時間,設計者並不想在這個時間內讓 Pod 停留在 channel 中。注:雖然官方沒有說,這裏還有一個可能的原因是 channel 不能持久化數據。 NextPod func() *framework.PodInfo // Error is called if there is an error. It is passed the pod in // question, and the error Error func(*framework.PodInfo, error) // 用一個空的 struct channel 來標識是否需要 stop。Golang 中的慣用用法。 StopEverything <-chan struct{} // 處理 PVC/PV VolumeBinder scheduling.SchedulerVolumeBinder // 是否禁止 Pod 搶佔 DisablePreemption bool // 調度隊列,需要調度的 Pod 都存在這個隊列裏面,內部實現是一個優先級隊列 SchedulingQueue internalqueue.SchedulingQueue // Profiles are the scheduling profiles. Profiles profile.Map scheduledPodsHasSynced func() bool }
運行調度器
下面看一下調度器
// Run begins watching and scheduling. It waits for cache to be synced, then starts scheduling and blocked until the context is done. func (sched *Scheduler) Run(ctx context.Context) { if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) { return } sched.SchedulingQueue.Run() wait.UntilWithContext(ctx, sched.scheduleOne, 0) sched.SchedulingQueue.Close() }
在 scheduler 的 Run 函數中主要做了三件事情:
- 等待 scheduler cache 同步(scheduler 剛起來,相當於冷啓動)
- 運行調度器隊列的 Run 函數
- 運行 scheduler 的 scheduleOne 函數
調度隊列
調度隊列的 Run 函數第一次看到總是給你一點點疑惑,作爲一個隊列難道還需要啓動嗎?確實是這樣,如果調度隊列只是一個優先級隊列,那麼確實不需要啓動。kubernetes 中的調度隊列是由三個隊列組成,分別是:
- activeQueue:待調度的 pod 隊列,scheduler 會監聽這個隊列
- backoffQueue:在 kubernetes 中,如果調度失敗了,就相當於一次 backoff。backoffQueue 專門用來存放 backoff 的 pod。一般會有一個 backoffLimit 的限制就是最多容忍多少次 backoff。其次每次 backoff 之間的時間成倍增長。
- unschedulableQueue:調度過程被終止的 pod 存放的隊列。
調度隊列的 Run 函數做的事情就是將 backoffQueue 和 unschedulableQueue 中 pod 定期移動到 activeQueue 中。
// Run starts the goroutine to pump from podBackoffQ to activeQ func (p *PriorityQueue) Run() { go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop) go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop) }
其中 wait.Until
實際上就是一個類似 Cron 的定時調度器。細節實現暫時不細說了。
// Until loops until stop channel is closed, running f every period. // // Until is syntactic sugar on top of JitterUntil with zero jitter factor and // with sliding = true (which means the timer for period starts after the f // completes). func Until(f func(), period time.Duration, stopCh <-chan struct{}) { JitterUntil(f, period, 0.0, true, stopCh) }
我們再來看一下兩個 flush 函數的邏輯。首先是 flushBackoffQCompleted()
,主要邏輯如下:
getBackoffTime calculateBackoffDuration()
// flushBackoffQCompleted Moves all pods from backoffQ which have completed backoff in to activeQ func (p *PriorityQueue) flushBackoffQCompleted() { p.lock.Lock() defer p.lock.Unlock() for { rawPodInfo := p.podBackoffQ.Peek() if rawPodInfo == nil { return } pod := rawPodInfo.(*framework.PodInfo).Pod boTime := p.getBackoffTime(rawPodInfo.(*framework.PodInfo)) if boTime.After(p.clock.Now()) { return } _, err := p.podBackoffQ.Pop() if err != nil { klog.Errorf("Unable to pop pod %v from backoff queue despite backoff completion.", nsNameForPod(pod)) return } p.activeQ.Add(rawPodInfo) metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc() defer p.cond.Broadcast() } } // getBackoffTime returns the time that podInfo completes backoff func (p *PriorityQueue) getBackoffTime(podInfo *framework.PodInfo) time.Time { duration := p.calculateBackoffDuration(podInfo) backoffTime := podInfo.Timestamp.Add(duration) return backoffTime } // calculateBackoffDuration is a helper function for calculating the backoffDuration // based on the number of attempts the pod has made. func (p *PriorityQueue) calculateBackoffDuration(podInfo *framework.PodInfo) time.Duration { duration := p.podInitialBackoffDuration for i := 1; i < podInfo.Attempts; i++ { duration = duration * 2 if duration > p.podMaxBackoffDuration { return p.podMaxBackoffDuration } } return duration }
下面我們看一下 unschedulableQueue 中的 pod 是如何 flush 的,也就是函數 flushUnschedulableQLeftover
的實現邏輯。邏輯非常簡單,如果 pod 在 unschedulableQueue 中停留時間超過了 60s,就會被移除到 activeQueue。
// flushUnschedulableQLeftover moves pod which stays in unschedulableQ longer than the unschedulableQTimeInterval // to activeQ. func (p *PriorityQueue) flushUnschedulableQLeftover() { p.lock.Lock() defer p.lock.Unlock() var podsToMove []*framework.PodInfo currentTime := p.clock.Now() for _, pInfo := range p.unschedulableQ.podInfoMap { lastScheduleTime := pInfo.Timestamp if currentTime.Sub(lastScheduleTime) > unschedulableQTimeInterval { podsToMove = append(podsToMove, pInfo) } } if len(podsToMove) > 0 { p.movePodsToActiveOrBackoffQueue(podsToMove, UnschedulableTimeout) } } const ( // If the pod stays in unschedulableQ longer than the unschedulableQTimeInterval, // the pod will be moved from unschedulableQ to activeQ. unschedulableQTimeInterval = 60 * time.Second queueClosed = "scheduling queue is closed" )