前言
Scheduler是Kubernetes組件中功能&邏輯相對單一&簡單的模塊,它主要的作用是:watch kube-apiserver,監(jiān)聽PodSpec.NodeName為空的pod,并利用預選和優(yōu)選算法為該pod選擇一個最佳的調(diào)度節(jié)點,最終將pod與該節(jié)點進行綁定,使pod調(diào)度在該節(jié)點上運行。
展開上述調(diào)用流程中的scheduler部分,內(nèi)部細節(jié)調(diào)用(參考Kubernetes Scheduler)如圖所示:
scheduler內(nèi)部預置了很多預選和優(yōu)選算法(參考scheduler_algorithm),比如預選:NoDiskConflict,PodFitsResources,MatchNodeSelector,CheckNodeMemoryPressure等;優(yōu)選:LeastRequestedPriority,BalancedResourceAllocation,CalculateAntiAffinityPriority,NodeAffinityPriority等。但是在實際生產(chǎn)環(huán)境中我們常常會需要一些特殊的調(diào)度策略,比如批量調(diào)度(aka coscheduling or gang scheduling),這是kubernetes默認調(diào)度策略所無法滿足的,這個時候就需要我們對scheduler進行擴展來實現(xiàn)這個功能了。
scheduler擴展方案
目前Kubernetes支持四種方式實現(xiàn)客戶自定義的調(diào)度算法(預選&優(yōu)選),如下:
·default-scheduler recoding:直接在Kubernetes默認scheduler基礎上進行添加,然后重新編譯kube-scheduler
·standalone:實現(xiàn)一個與kube-scheduler平行的custom scheduler,單獨或者和默認kube-scheduler一起運行在集群中
·scheduler extender:實現(xiàn)一個"scheduler extender",kube-scheduler會調(diào)用它(http/https)作為默認調(diào)度算法(預選&優(yōu)選&bind)的補充
·scheduler framework:實現(xiàn)scheduler framework plugins,重新編譯kube-scheduler,類似于第一種方案,但是更加標準化,插件化
下面分別展開介紹這幾種方式的原理和開發(fā)指引。
default-scheduler recoding
這里我們先分析一下kube-scheduler調(diào)度相關(guān)入口:
設置默認預選&優(yōu)選策略
見defaultPredicates以及defaultPriorities(k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults/defaults.go):
func init() {
registerAlgorithmProvider(defaultPredicates(), defaultPriorities())
}
func defaultPredicates() sets.String {
return sets.NewString(
predicates.NoVolumeZoneConflictPred,
predicates.MaxEBSVolumeCountPred,
predicates.MaxGCEPDVolumeCountPred,
predicates.MaxAzureDiskVolumeCountPred,
predicates.MaxCSIVolumeCountPred,
predicates.MatchInterPodAffinityPred,
predicates.NoDiskConflictPred,
predicates.GeneralPred,
predicates.PodToleratesNodeTaintsPred,
predicates.CheckVolumeBindingPred,
predicates.CheckNodeUnschedulablePred,
)
}
func defaultPriorities() sets.String {
return sets.NewString(
priorities.SelectorSpreadPriority,
priorities.InterPodAffinityPriority,
priorities.LeastRequestedPriority,
priorities.BalancedResourceAllocation,
priorities.NodePreferAvoidPodsPriority,
priorities.NodeAffinityPriority,
priorities.TaintTolerationPriority,
priorities.ImageLocalityPriority,
)
}
func registerAlgorithmProvider(predSet, priSet sets.String) {
// Registers algorithm providers. By default we use 'DefaultProvider', but user can specify one to be used
// by specifying flag.
scheduler.RegisterAlgorithmProvider(scheduler.DefaultProvider, predSet, priSet)
// Cluster autoscaler friendly scheduling algorithm.
scheduler.RegisterAlgorithmProvider(ClusterAutoscalerProvider, predSet,
copyAndReplace(priSet, priorities.LeastRequestedPriority, priorities.MostRequestedPriority))
}
const (
// DefaultProvider defines the default algorithm provider name.
DefaultProvider = "DefaultProvider"
)
注冊預選和優(yōu)選相關(guān)處理函數(shù)
注冊預選函數(shù)(k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults/register_predicates.go):
func init() {
...
// Fit is determined by resource availability.
// This predicate is actually a default predicate, because it is invoked from
// predicates.GeneralPredicates()
scheduler.RegisterFitPredicate(predicates.PodFitsResourcesPred, predicates.PodFitsResources)
}
注冊優(yōu)選函數(shù)(k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults/register_priorities.go):
func init() {
...
// Prioritizes nodes that have labels matching NodeAffinity
scheduler.RegisterPriorityMapReduceFunction(priorities.NodeAffinityPriority, priorities.CalculateNodeAffinityPriorityMap, priorities.CalculateNodeAffinityPriorityReduce, 1)
}
·編寫預選和優(yōu)選處理函數(shù)
PodFitsResourcesPred對應的預選函數(shù)如下(k8s.io/kubernetes/pkg/scheduler/algorithm/predicates/predicates.go):
// PodFitsResources checks if a node has sufficient resources, such as cpu, memory, gpu, opaque int resources etc to run a pod.
// First return value indicates whether a node has sufficient resources to run a pod while the second return value indicates the
// predicate failure reasons if the node has insufficient resources to run the pod.
func PodFitsResources(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {
node := nodeInfo.Node()
if node == nil {
return false, nil, fmt.Errorf("node not found")
}
var predicateFails []PredicateFailureReason
allowedPodNumber := nodeInfo.AllowedPodNumber()
if len(nodeInfo.Pods())+1 > allowedPodNumber {
predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourcePods, 1, int64(len(nodeInfo.Pods())), int64(allowedPodNumber)))
}
// No extended resources should be ignored by default.
ignoredExtendedResources := sets.NewString()
var podRequest *schedulernodeinfo.Resource
if predicateMeta, ok := meta.(*predicateMetadata); ok && predicateMeta.podFitsResourcesMetadata != nil {
podRequest = predicateMeta.podFitsResourcesMetadata.podRequest
if predicateMeta.podFitsResourcesMetadata.ignoredExtendedResources != nil {
ignoredExtendedResources = predicateMeta.podFitsResourcesMetadata.ignoredExtendedResources
}
} else {
// We couldn't parse metadata - fallback to computing it.
podRequest = GetResourceRequest(pod)
}
if podRequest.MilliCPU == 0 &&
podRequest.Memory == 0 &&
podRequest.EphemeralStorage == 0 &&
len(podRequest.ScalarResources) == 0 {
return len(predicateFails) == 0, predicateFails, nil
}
allocatable := nodeInfo.AllocatableResource()
if allocatable.MilliCPU < podRequest.MilliCPU+nodeInfo.RequestedResource().MilliCPU {
predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceCPU, podRequest.MilliCPU, nodeInfo.RequestedResource().MilliCPU, allocatable.MilliCPU))
}
if allocatable.Memory < podRequest.Memory+nodeInfo.RequestedResource().Memory {
predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceMemory, podRequest.Memory, nodeInfo.RequestedResource().Memory, allocatable.Memory))
}
if allocatable.EphemeralStorage < podRequest.EphemeralStorage+nodeInfo.RequestedResource().EphemeralStorage {
predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceEphemeralStorage, podRequest.EphemeralStorage, nodeInfo.RequestedResource().EphemeralStorage, allocatable.EphemeralStorage))
}
for rName, rQuant := range podRequest.ScalarResources {
if v1helper.IsExtendedResourceName(rName) {
// If this resource is one of the extended resources that should be
// ignored, we will skip checking it.
if ignoredExtendedResources.Has(string(rName)) {
continue
}
}
if allocatable.ScalarResources[rName] < rQuant+nodeInfo.RequestedResource().ScalarResources[rName] {
predicateFails = append(predicateFails, NewInsufficientResourceError(rName, podRequest.ScalarResources[rName], nodeInfo.RequestedResource().ScalarResources[rName], allocatable.ScalarResources[rName]))
}
}
if klog.V(10) {
if len(predicateFails) == 0 {
// We explicitly don't do klog.V(10).Infof() to avoid computing all the parameters if this is
// not logged. There is visible performance gain from it.
klog.Infof("Schedule Pod %+v on Node %+v is allowed, Node is running only %v out of %v Pods.",
podName(pod), node.Name, len(nodeInfo.Pods()), allowedPodNumber)
}
}
return len(predicateFails) == 0, predicateFails, nil
}
優(yōu)選NodeAffinityPriority對應的Map與Reduce函數(shù)(k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/node_affinity.go)如下:
// CalculateNodeAffinityPriorityMap prioritizes nodes according to node affinity scheduling preferences
// indicated in PreferredDuringSchedulingIgnoredDuringExecution. Each time a node matches a preferredSchedulingTerm,
// it will get an add of preferredSchedulingTerm.Weight. Thus, the more preferredSchedulingTerms
// the node satisfies and the more the preferredSchedulingTerm that is satisfied weights, the higher
// score the node gets.
func CalculateNodeAffinityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {
node := nodeInfo.Node()
if node == nil {
return framework.NodeScore{}, fmt.Errorf("node not found")
}
// default is the podspec.
affinity := pod.Spec.Affinity
if priorityMeta, ok := meta.(*priorityMetadata); ok {
// We were able to parse metadata, use affinity from there.
affinity = priorityMeta.affinity
}
var count int32
// A nil element of PreferredDuringSchedulingIgnoredDuringExecution matches no objects.
// An element of PreferredDuringSchedulingIgnoredDuringExecution that refers to an
// empty PreferredSchedulingTerm matches all objects.
if affinity != nil && affinity.NodeAffinity != nil && affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil {
// Match PreferredDuringSchedulingIgnoredDuringExecution term by term.
for i := range affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution {
preferredSchedulingTerm := &affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution[i]
if preferredSchedulingTerm.Weight == 0 {
continue
}
// TODO: Avoid computing it for all nodes if this becomes a performance problem.
nodeSelector, err := v1helper.NodeSelectorRequirementsAsSelector(preferredSchedulingTerm.Preference.MatchExpressions)
if err != nil {
return framework.NodeScore{}, err
}
if nodeSelector.Matches(labels.Set(node.Labels)) {
count += preferredSchedulingTerm.Weight
}
}
}
return framework.NodeScore{
Name: node.Name,
Score: int64(count),
}, nil
}
// CalculateNodeAffinityPriorityReduce is a reduce function for node affinity priority calculation.
var CalculateNodeAffinityPriorityReduce = NormalizeReduce(framework.MaxNodeScore, false)