騰訊云帶你一篇讀懂Kubernetes Scheduler擴展功能(一)

來源: 騰訊云原生
作者:小小小小杜
時間:2020-11-03
18244
本文介紹Kubernetes Scheduler擴展功能。

前言

Scheduler是Kubernetes組件中功能&邏輯相對單一&簡單的模塊,它主要的作用是:watch kube-apiserver,監(jiān)聽PodSpec.NodeName為空的pod,并利用預選和優(yōu)選算法為該pod選擇一個最佳的調(diào)度節(jié)點,最終將pod與該節(jié)點進行綁定,使pod調(diào)度在該節(jié)點上運行。

640.png

展開上述調(diào)用流程中的scheduler部分,內(nèi)部細節(jié)調(diào)用(參考Kubernetes Scheduler)如圖所示:

640 (1).png

scheduler內(nèi)部預置了很多預選和優(yōu)選算法(參考scheduler_algorithm),比如預選:NoDiskConflict,PodFitsResources,MatchNodeSelector,CheckNodeMemoryPressure等;優(yōu)選:LeastRequestedPriority,BalancedResourceAllocation,CalculateAntiAffinityPriority,NodeAffinityPriority等。但是在實際生產(chǎn)環(huán)境中我們常常會需要一些特殊的調(diào)度策略,比如批量調(diào)度(aka coscheduling or gang scheduling),這是kubernetes默認調(diào)度策略所無法滿足的,這個時候就需要我們對scheduler進行擴展來實現(xiàn)這個功能了。

scheduler擴展方案

目前Kubernetes支持四種方式實現(xiàn)客戶自定義的調(diào)度算法(預選&優(yōu)選),如下:

·default-scheduler recoding:直接在Kubernetes默認scheduler基礎上進行添加,然后重新編譯kube-scheduler

·standalone:實現(xiàn)一個與kube-scheduler平行的custom scheduler,單獨或者和默認kube-scheduler一起運行在集群中

·scheduler extender:實現(xiàn)一個"scheduler extender",kube-scheduler會調(diào)用它(http/https)作為默認調(diào)度算法(預選&優(yōu)選&bind)的補充

·scheduler framework:實現(xiàn)scheduler framework plugins,重新編譯kube-scheduler,類似于第一種方案,但是更加標準化,插件化

下面分別展開介紹這幾種方式的原理和開發(fā)指引。

default-scheduler recoding

這里我們先分析一下kube-scheduler調(diào)度相關(guān)入口:

設置默認預選&優(yōu)選策略

見defaultPredicates以及defaultPriorities(k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults/defaults.go):

func init() {

  registerAlgorithmProvider(defaultPredicates(), defaultPriorities())

}


func defaultPredicates() sets.String {

  return sets.NewString(

    predicates.NoVolumeZoneConflictPred,

    predicates.MaxEBSVolumeCountPred,

    predicates.MaxGCEPDVolumeCountPred,

    predicates.MaxAzureDiskVolumeCountPred,

    predicates.MaxCSIVolumeCountPred,

    predicates.MatchInterPodAffinityPred,

    predicates.NoDiskConflictPred,

    predicates.GeneralPred,

    predicates.PodToleratesNodeTaintsPred,

    predicates.CheckVolumeBindingPred,

    predicates.CheckNodeUnschedulablePred,

  )

}


func defaultPriorities() sets.String {

  return sets.NewString(

    priorities.SelectorSpreadPriority,

    priorities.InterPodAffinityPriority,

    priorities.LeastRequestedPriority,

    priorities.BalancedResourceAllocation,

    priorities.NodePreferAvoidPodsPriority,

    priorities.NodeAffinityPriority,

    priorities.TaintTolerationPriority,

    priorities.ImageLocalityPriority,

  )

}


func registerAlgorithmProvider(predSet, priSet sets.String) {

  // Registers algorithm providers. By default we use 'DefaultProvider', but user can specify one to be used

  // by specifying flag.

  scheduler.RegisterAlgorithmProvider(scheduler.DefaultProvider, predSet, priSet)

  // Cluster autoscaler friendly scheduling algorithm.

  scheduler.RegisterAlgorithmProvider(ClusterAutoscalerProvider, predSet,

    copyAndReplace(priSet, priorities.LeastRequestedPriority, priorities.MostRequestedPriority))

}


const (

  // DefaultProvider defines the default algorithm provider name.

  DefaultProvider = "DefaultProvider"

)

注冊預選和優(yōu)選相關(guān)處理函數(shù)

注冊預選函數(shù)(k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults/register_predicates.go):

func init() {

    ...

  // Fit is determined by resource availability.

  // This predicate is actually a default predicate, because it is invoked from

  // predicates.GeneralPredicates()

  scheduler.RegisterFitPredicate(predicates.PodFitsResourcesPred, predicates.PodFitsResources)

}

注冊優(yōu)選函數(shù)(k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults/register_priorities.go):

func init() {

    ...

  // Prioritizes nodes that have labels matching NodeAffinity

  scheduler.RegisterPriorityMapReduceFunction(priorities.NodeAffinityPriority, priorities.CalculateNodeAffinityPriorityMap, priorities.CalculateNodeAffinityPriorityReduce, 1)

}

·編寫預選和優(yōu)選處理函數(shù)

PodFitsResourcesPred對應的預選函數(shù)如下(k8s.io/kubernetes/pkg/scheduler/algorithm/predicates/predicates.go):

// PodFitsResources checks if a node has sufficient resources, such as cpu, memory, gpu, opaque int resources etc to run a pod.

// First return value indicates whether a node has sufficient resources to run a pod while the second return value indicates the

// predicate failure reasons if the node has insufficient resources to run the pod.

func PodFitsResources(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {

  node := nodeInfo.Node()

  if node == nil {

    return false, nil, fmt.Errorf("node not found")

  }


  var predicateFails []PredicateFailureReason

  allowedPodNumber := nodeInfo.AllowedPodNumber()

  if len(nodeInfo.Pods())+1 > allowedPodNumber {

    predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourcePods, 1, int64(len(nodeInfo.Pods())), int64(allowedPodNumber)))

  }


  // No extended resources should be ignored by default.

  ignoredExtendedResources := sets.NewString()


  var podRequest *schedulernodeinfo.Resource

  if predicateMeta, ok := meta.(*predicateMetadata); ok && predicateMeta.podFitsResourcesMetadata != nil {

    podRequest = predicateMeta.podFitsResourcesMetadata.podRequest

    if predicateMeta.podFitsResourcesMetadata.ignoredExtendedResources != nil {

      ignoredExtendedResources = predicateMeta.podFitsResourcesMetadata.ignoredExtendedResources

    }

  } else {

    // We couldn't parse metadata - fallback to computing it.

    podRequest = GetResourceRequest(pod)

  }

  if podRequest.MilliCPU == 0 &&

    podRequest.Memory == 0 &&

    podRequest.EphemeralStorage == 0 &&

    len(podRequest.ScalarResources) == 0 {

    return len(predicateFails) == 0, predicateFails, nil

  }


  allocatable := nodeInfo.AllocatableResource()

  if allocatable.MilliCPU < podRequest.MilliCPU+nodeInfo.RequestedResource().MilliCPU {

    predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceCPU, podRequest.MilliCPU, nodeInfo.RequestedResource().MilliCPU, allocatable.MilliCPU))

  }

  if allocatable.Memory < podRequest.Memory+nodeInfo.RequestedResource().Memory {

    predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceMemory, podRequest.Memory, nodeInfo.RequestedResource().Memory, allocatable.Memory))

  }

  if allocatable.EphemeralStorage < podRequest.EphemeralStorage+nodeInfo.RequestedResource().EphemeralStorage {

    predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceEphemeralStorage, podRequest.EphemeralStorage, nodeInfo.RequestedResource().EphemeralStorage, allocatable.EphemeralStorage))

  }


  for rName, rQuant := range podRequest.ScalarResources {

    if v1helper.IsExtendedResourceName(rName) {

      // If this resource is one of the extended resources that should be

      // ignored, we will skip checking it.

      if ignoredExtendedResources.Has(string(rName)) {

        continue

      }

    }

    if allocatable.ScalarResources[rName] < rQuant+nodeInfo.RequestedResource().ScalarResources[rName] {

      predicateFails = append(predicateFails, NewInsufficientResourceError(rName, podRequest.ScalarResources[rName], nodeInfo.RequestedResource().ScalarResources[rName], allocatable.ScalarResources[rName]))

    }

  }


  if klog.V(10) {

    if len(predicateFails) == 0 {

      // We explicitly don't do klog.V(10).Infof() to avoid computing all the parameters if this is

      // not logged. There is visible performance gain from it.

      klog.Infof("Schedule Pod %+v on Node %+v is allowed, Node is running only %v out of %v Pods.",

        podName(pod), node.Name, len(nodeInfo.Pods()), allowedPodNumber)

    }

  }

  return len(predicateFails) == 0, predicateFails, nil

}

優(yōu)選NodeAffinityPriority對應的Map與Reduce函數(shù)(k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/node_affinity.go)如下:

// CalculateNodeAffinityPriorityMap prioritizes nodes according to node affinity scheduling preferences

// indicated in PreferredDuringSchedulingIgnoredDuringExecution. Each time a node matches a preferredSchedulingTerm,

// it will get an add of preferredSchedulingTerm.Weight. Thus, the more preferredSchedulingTerms

// the node satisfies and the more the preferredSchedulingTerm that is satisfied weights, the higher

// score the node gets.

func CalculateNodeAffinityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {

  node := nodeInfo.Node()

  if node == nil {

    return framework.NodeScore{}, fmt.Errorf("node not found")

  }


  // default is the podspec.

  affinity := pod.Spec.Affinity

  if priorityMeta, ok := meta.(*priorityMetadata); ok {

    // We were able to parse metadata, use affinity from there.

    affinity = priorityMeta.affinity

  }


  var count int32

  // A nil element of PreferredDuringSchedulingIgnoredDuringExecution matches no objects.

  // An element of PreferredDuringSchedulingIgnoredDuringExecution that refers to an

  // empty PreferredSchedulingTerm matches all objects.

  if affinity != nil && affinity.NodeAffinity != nil && affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil {

    // Match PreferredDuringSchedulingIgnoredDuringExecution term by term.

    for i := range affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution {

      preferredSchedulingTerm := &affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution[i]

      if preferredSchedulingTerm.Weight == 0 {

        continue

      }


      // TODO: Avoid computing it for all nodes if this becomes a performance problem.

      nodeSelector, err := v1helper.NodeSelectorRequirementsAsSelector(preferredSchedulingTerm.Preference.MatchExpressions)

      if err != nil {

        return framework.NodeScore{}, err

      }

      if nodeSelector.Matches(labels.Set(node.Labels)) {

        count += preferredSchedulingTerm.Weight

      }

    }

  }


  return framework.NodeScore{

    Name:  node.Name,

    Score: int64(count),

  }, nil

}


// CalculateNodeAffinityPriorityReduce is a reduce function for node affinity priority calculation.

var CalculateNodeAffinityPriorityReduce = NormalizeReduce(framework.MaxNodeScore, false)

立即登錄,閱讀全文
版權(quán)說明:
本文內(nèi)容來自于騰訊云原生,本站不擁有所有權(quán),不承擔相關(guān)法律責任。文章內(nèi)容系作者個人觀點,不代表快出海對觀點贊同或支持。如有侵權(quán),請聯(lián)系管理員(zzx@kchuhai.com)刪除!
優(yōu)質(zhì)服務商推薦
更多