本篇文章介紹一下 Kubernetes 的默認調度器 kube-scheduler 的源碼實現。kubernetes 代碼版本:v1.18.4-rc.0。

0. 入口

入口函數在路徑 kubernetes/cmd/kube-scheduler/scheduler.go#main() ,如下

func main() {
	rand.Seed(time.Now().UnixNano())

	command := app.NewSchedulerCommand()

	// TODO: once we switch everything over to Cobra commands, we can go back to calling
	// utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
	// normalize func and add the go flag set by hand.
	pflag.CommandLine.SetNormalizeFunc(cliflag.WordSepNormalizeFunc)
	// utilflag.InitFlags()
	logs.InitLogs()
	defer logs.FlushLogs()

	if err := command.Execute(); err != nil {
		os.Exit(1)
	}
}

核心邏輯就是:1. 創建一個 SchedulerCommand(第 4 行);2. 接收參數並執行(第 14 行)。我們先看一下創建 SchedulerCommand 的邏輯。

// NewSchedulerCommand creates a *cobra.Command object with default parameters and registryOptions
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command {
	opts, err := options.NewOptions()
	if err != nil {
		klog.Fatalf("unable to initialize command options: %v", err)
	}

	cmd := &cobra.Command{
		Use: "kube-scheduler",
		Long: `The Kubernetes scheduler is a policy-rich, topology-aware,
workload-specific function that significantly impacts availability, performance,
and capacity. The scheduler needs to take into account individual and collective
resource requirements, quality of service requirements, hardware/software/policy
constraints, affinity and anti-affinity specifications, data locality, inter-workload
interference, deadlines, and so on. Workload-specific requirements will be exposed
through the API as necessary. See [scheduling](https://kubernetes.io/docs/concepts/scheduling/)
for more information about scheduling and the kube-scheduler component.`,
		Run: func(cmd *cobra.Command, args []string) {
			if err := runCommand(cmd, args, opts, registryOptions...); err != nil {
				fmt.Fprintf(os.Stderr, "%v\n", err)
				os.Exit(1)
			}
		},
	}
	fs := cmd.Flags()
	
  ...
  
	return cmd
}

首先我們可以看到 NewSchedulerCommand 接收一個不定參數,registryOptions。從名字我們可以看出來首先這個參數是作用於一個 Registry 的,這個 Registry 實際上就是用來管理 kuberentes 中的 plugin 的。

// Registry is a collection of all available plugins. The framework uses a
// registry to enable and initialize configured plugins.
// All plugins must be in the registry before initializing the framework.
type Registry map[string]PluginFactory

而 registryOptions 中的 option 其實是一種函數傳參的方式的使用。option 傳參的方式最早由 Rob Pike 提出來的,簡單來說就是將可選的 option 參數封裝成多個函數傳給目標函數,然後在目標函數內部通過調用 option 函數的方式來初始化。後面我們看到 RegistryOptions 初始化的部分再來介紹。對於 option 這種方式感興趣的同學可以參考我之前的一篇文章: http://legendtkl.com/2016/11/05/code-scalability/

其次是 cmd,通過 cobra.Command 構建出來的一個 CLI 處理工具,對於命令行的輸入通過第 18 行的匿名函數來處理,匿名函數內部會調用函數 runCommand 來啓動 scheduler 進程。去掉一些不重要的代碼邏輯,runCommand 主要做的事情就是創建 scheduler 參數,然後通過 Run 函數啓動 scheduler 進程。

// runCommand runs the scheduler.
func runCommand(cmd *cobra.Command, args []string, opts *options.Options, registryOptions ...Option) error {
	...
	
  // 創建 scheduler 參數
	c, err := opts.Config()
	if err != nil {
		return err
	}

	// Get the completed config
  // 參數補充
	cc := c.Complete()

	// Configz registration.
	if cz, err := configz.New("componentconfig"); err == nil {
		cz.Set(cc.ComponentConfig)
	} else {
		return fmt.Errorf("unable to register configz: %s", err)
	}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	return Run(ctx, cc, registryOptions...)
}

Run 函數的主要邏輯如下:

  1. 初始化 Registry,第 6 ~ 11 行就是 option 這種函數傳參的處理邏輯。
  2. 創建 scheduler 實例
  3. 其他初始化操作,包括 EventBroadcast、健康檢測、metric 等相關邏輯
  4. 啓動 Pod Informer 來監聽 Pod
  5. 運行調度器(分沒有啓動 leader 選舉,但是對應的方法都是 sched.Run 方法)
// Run executes the scheduler based on the given configuration. It only returns on error or when context is done.
func Run(ctx context.Context, cc schedulerserverconfig.CompletedConfig, outOfTreeRegistryOptions ...Option) error {
	// To help debugging, immediately log version
	klog.V(1).Infof("Starting Kubernetes Scheduler version %+v", version.Get())

	outOfTreeRegistry := make(framework.Registry)
	for _, option := range outOfTreeRegistryOptions {
		if err := option(outOfTreeRegistry); err != nil {
			return err
		}
	}

	recorderFactory := getRecorderFactory(&cc)
	// Create the scheduler.
	sched, err := scheduler.New(cc.Client,
		cc.InformerFactory,
		cc.PodInformer,
		recorderFactory,
		ctx.Done(),
		scheduler.WithProfiles(cc.ComponentConfig.Profiles...),
		scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource),
		scheduler.WithPreemptionDisabled(cc.ComponentConfig.DisablePreemption),
		scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore),
		scheduler.WithBindTimeoutSeconds(cc.ComponentConfig.BindTimeoutSeconds),
		scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry),
		scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds),
		scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds),
		scheduler.WithExtenders(cc.ComponentConfig.Extenders...),
	)
	if err != nil {
		return err
	}

	// Prepare the event broadcaster.
	if cc.Broadcaster != nil && cc.EventClient != nil {
		cc.Broadcaster.StartRecordingToSink(ctx.Done())
	}
	if cc.CoreBroadcaster != nil && cc.CoreEventClient != nil {
		cc.CoreBroadcaster.StartRecordingToSink(&corev1.EventSinkImpl{Interface: cc.CoreEventClient.Events("")})
	}
	// Setup healthz checks.
	var checks []healthz.HealthChecker
	if cc.ComponentConfig.LeaderElection.LeaderElect {
		checks = append(checks, cc.LeaderElection.WatchDog)
	}

	// Start up the healthz server.
	if cc.InsecureServing != nil {
		separateMetrics := cc.InsecureMetricsServing != nil
		handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, separateMetrics, checks...), nil, nil)
		if err := cc.InsecureServing.Serve(handler, 0, ctx.Done()); err != nil {
			return fmt.Errorf("failed to start healthz server: %v", err)
		}
	}
	if cc.InsecureMetricsServing != nil {
		handler := buildHandlerChain(newMetricsHandler(&cc.ComponentConfig), nil, nil)
		if err := cc.InsecureMetricsServing.Serve(handler, 0, ctx.Done()); err != nil {
			return fmt.Errorf("failed to start metrics server: %v", err)
		}
	}
	if cc.SecureServing != nil {
		handler := buildHandlerChain(newHealthzHandler(&cc.ComponentConfig, false, checks...), cc.Authentication.Authenticator, cc.Authorization.Authorizer)
		// TODO: handle stoppedCh returned by c.SecureServing.Serve
		if _, err := cc.SecureServing.Serve(handler, 0, ctx.Done()); err != nil {
			// fail early for secure handlers, removing the old error loop from above
			return fmt.Errorf("failed to start secure server: %v", err)
		}
	}

	// Start all informers.
	go cc.PodInformer.Informer().Run(ctx.Done())
	cc.InformerFactory.Start(ctx.Done())

	// Wait for all caches to sync before scheduling.
	cc.InformerFactory.WaitForCacheSync(ctx.Done())

	// If leader election is enabled, runCommand via LeaderElector until done and exit.
	if cc.LeaderElection != nil {
		cc.LeaderElection.Callbacks = leaderelection.LeaderCallbacks{
			OnStartedLeading: sched.Run,
			OnStoppedLeading: func() {
				klog.Fatalf("leaderelection lost")
			},
		}
		leaderElector, err := leaderelection.NewLeaderElector(*cc.LeaderElection)
		if err != nil {
			return fmt.Errorf("couldn't create leader elector: %v", err)
		}

		leaderElector.Run(ctx)

		return fmt.Errorf("lost lease")
	}

	// Leader election is disabled, so runCommand inline until done.
	sched.Run(ctx)
	return fmt.Errorf("finished without leader elect")
}

scheduler 實例

首先我們看一下 scheduler 的定義,路徑爲 pkg/scheduler/scheduler.go

// Scheduler 監聽未調度的 Pod,爲其尋找適合的 Node 節點,並寫回到 api server
type Scheduler struct {
  // 調度器 Cache
	SchedulerCache internalcache.Cache

	Algorithm core.ScheduleAlgorithm
	// PodConditionUpdater is used only in case of scheduling errors. If we succeed
	// with scheduling, PodScheduled condition will be updated in apiserver in /bind
	// handler so that binding and setting PodCondition it is atomic.
	podConditionUpdater podConditionUpdater
  // 在搶佔情況下用來驅逐 pod,更新搶佔者的 'NominatedNode' 字段
	podPreemptor podPreemptor

  // 返回下一個需要調度的 Pod,如果沒有需要調度的 pod,則該方法將 block 住。這裏不使用 channel 數據結構是因爲調度過程可能會花費一定時間,設計者並不想在這個時間內讓 Pod 停留在 channel 中。注:雖然官方沒有說,這裏還有一個可能的原因是 channel 不能持久化數據。
	NextPod func() *framework.PodInfo

	// Error is called if there is an error. It is passed the pod in
	// question, and the error
	Error func(*framework.PodInfo, error)

  // 用一個空的 struct channel 來標識是否需要 stop。Golang 中的慣用用法。
	StopEverything <-chan struct{}

  // 處理 PVC/PV
	VolumeBinder scheduling.SchedulerVolumeBinder

  // 是否禁止 Pod 搶佔
	DisablePreemption bool

	// 調度隊列,需要調度的 Pod 都存在這個隊列裏面,內部實現是一個優先級隊列
	SchedulingQueue internalqueue.SchedulingQueue

	// Profiles are the scheduling profiles.
	Profiles profile.Map
	
	scheduledPodsHasSynced func() bool
}

運行調度器

下面看一下調度器

// Run begins watching and scheduling. It waits for cache to be synced, then starts scheduling and blocked until the context is done.
func (sched *Scheduler) Run(ctx context.Context) {
	if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) {
		return
	}
	sched.SchedulingQueue.Run()
	wait.UntilWithContext(ctx, sched.scheduleOne, 0)
	sched.SchedulingQueue.Close()
}

在 scheduler 的 Run 函數中主要做了三件事情:

  1. 等待 scheduler cache 同步(scheduler 剛起來,相當於冷啓動)
  2. 運行調度器隊列的 Run 函數
  3. 運行 scheduler 的 scheduleOne 函數

調度隊列

調度隊列的 Run 函數第一次看到總是給你一點點疑惑,作爲一個隊列難道還需要啓動嗎?確實是這樣,如果調度隊列只是一個優先級隊列,那麼確實不需要啓動。kubernetes 中的調度隊列是由三個隊列組成,分別是:

  • activeQueue:待調度的 pod 隊列,scheduler 會監聽這個隊列
  • backoffQueue:在 kubernetes 中,如果調度失敗了,就相當於一次 backoff。backoffQueue 專門用來存放 backoff 的 pod。一般會有一個 backoffLimit 的限制就是最多容忍多少次 backoff。其次每次 backoff 之間的時間成倍增長。
  • unschedulableQueue:調度過程被終止的 pod 存放的隊列。

調度隊列的 Run 函數做的事情就是將 backoffQueue 和 unschedulableQueue 中 pod 定期移動到 activeQueue 中。

// Run starts the goroutine to pump from podBackoffQ to activeQ
func (p *PriorityQueue) Run() {
	go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
	go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop)
}

其中 wait.Until 實際上就是一個類似 Cron 的定時調度器。細節實現暫時不細說了。

// Until loops until stop channel is closed, running f every period.
//
// Until is syntactic sugar on top of JitterUntil with zero jitter factor and
// with sliding = true (which means the timer for period starts after the f
// completes).
func Until(f func(), period time.Duration, stopCh <-chan struct{}) {
	JitterUntil(f, period, 0.0, true, stopCh)
}

我們再來看一下兩個 flush 函數的邏輯。首先是 flushBackoffQCompleted() ,主要邏輯如下:

getBackoffTime
calculateBackoffDuration()
// flushBackoffQCompleted Moves all pods from backoffQ which have completed backoff in to activeQ
func (p *PriorityQueue) flushBackoffQCompleted() {
	p.lock.Lock()
	defer p.lock.Unlock()
	for {
		rawPodInfo := p.podBackoffQ.Peek()
		if rawPodInfo == nil {
			return
		}
		pod := rawPodInfo.(*framework.PodInfo).Pod
		boTime := p.getBackoffTime(rawPodInfo.(*framework.PodInfo))
		if boTime.After(p.clock.Now()) {
			return
		}
		_, err := p.podBackoffQ.Pop()
		if err != nil {
			klog.Errorf("Unable to pop pod %v from backoff queue despite backoff completion.", nsNameForPod(pod))
			return
		}
		p.activeQ.Add(rawPodInfo)
		metrics.SchedulerQueueIncomingPods.WithLabelValues("active", BackoffComplete).Inc()
		defer p.cond.Broadcast()
	}
}

// getBackoffTime returns the time that podInfo completes backoff
func (p *PriorityQueue) getBackoffTime(podInfo *framework.PodInfo) time.Time {
	duration := p.calculateBackoffDuration(podInfo)
	backoffTime := podInfo.Timestamp.Add(duration)
	return backoffTime
}

// calculateBackoffDuration is a helper function for calculating the backoffDuration
// based on the number of attempts the pod has made.
func (p *PriorityQueue) calculateBackoffDuration(podInfo *framework.PodInfo) time.Duration {
	duration := p.podInitialBackoffDuration
	for i := 1; i < podInfo.Attempts; i++ {
		duration = duration * 2
		if duration > p.podMaxBackoffDuration {
			return p.podMaxBackoffDuration
		}
	}
	return duration
}

下面我們看一下 unschedulableQueue 中的 pod 是如何 flush 的,也就是函數 flushUnschedulableQLeftover 的實現邏輯。邏輯非常簡單,如果 pod 在 unschedulableQueue 中停留時間超過了 60s,就會被移除到 activeQueue。

// flushUnschedulableQLeftover moves pod which stays in unschedulableQ longer than the unschedulableQTimeInterval
// to activeQ.
func (p *PriorityQueue) flushUnschedulableQLeftover() {
	p.lock.Lock()
	defer p.lock.Unlock()

	var podsToMove []*framework.PodInfo
	currentTime := p.clock.Now()
	for _, pInfo := range p.unschedulableQ.podInfoMap {
		lastScheduleTime := pInfo.Timestamp
		if currentTime.Sub(lastScheduleTime) > unschedulableQTimeInterval {
			podsToMove = append(podsToMove, pInfo)
		}
	}

	if len(podsToMove) > 0 {
		p.movePodsToActiveOrBackoffQueue(podsToMove, UnschedulableTimeout)
	}
}

const (
	// If the pod stays in unschedulableQ longer than the unschedulableQTimeInterval,
	// the pod will be moved from unschedulableQ to activeQ.
	unschedulableQTimeInterval = 60 * time.Second

	queueClosed = "scheduling queue is closed"
)
相關文章