騰訊云帶你一篇讀懂Kubernetes Scheduler擴(kuò)展功能(一)

來(lái)源: 騰訊云原生
作者:小小小小杜
時(shí)間:2020-11-03
18258
本文介紹Kubernetes Scheduler擴(kuò)展功能。

前言

Scheduler是Kubernetes組件中功能&邏輯相對(duì)單一&簡(jiǎn)單的模塊,它主要的作用是:watch kube-apiserver,監(jiān)聽(tīng)PodSpec.NodeName為空的pod,并利用預(yù)選和優(yōu)選算法為該pod選擇一個(gè)最佳的調(diào)度節(jié)點(diǎn),最終將pod與該節(jié)點(diǎn)進(jìn)行綁定,使pod調(diào)度在該節(jié)點(diǎn)上運(yùn)行。

640.png

展開(kāi)上述調(diào)用流程中的scheduler部分,內(nèi)部細(xì)節(jié)調(diào)用(參考Kubernetes Scheduler)如圖所示:

640 (1).png

scheduler內(nèi)部預(yù)置了很多預(yù)選和優(yōu)選算法(參考scheduler_algorithm),比如預(yù)選:NoDiskConflict,PodFitsResources,MatchNodeSelector,CheckNodeMemoryPressure等;優(yōu)選:LeastRequestedPriority,BalancedResourceAllocation,CalculateAntiAffinityPriority,NodeAffinityPriority等。但是在實(shí)際生產(chǎn)環(huán)境中我們常常會(huì)需要一些特殊的調(diào)度策略,比如批量調(diào)度(aka coscheduling or gang scheduling),這是kubernetes默認(rèn)調(diào)度策略所無(wú)法滿足的,這個(gè)時(shí)候就需要我們對(duì)scheduler進(jìn)行擴(kuò)展來(lái)實(shí)現(xiàn)這個(gè)功能了。

scheduler擴(kuò)展方案

目前Kubernetes支持四種方式實(shí)現(xiàn)客戶自定義的調(diào)度算法(預(yù)選&優(yōu)選),如下:

·default-scheduler recoding:直接在Kubernetes默認(rèn)scheduler基礎(chǔ)上進(jìn)行添加,然后重新編譯kube-scheduler

·standalone:實(shí)現(xiàn)一個(gè)與kube-scheduler平行的custom scheduler,單獨(dú)或者和默認(rèn)kube-scheduler一起運(yùn)行在集群中

·scheduler extender:實(shí)現(xiàn)一個(gè)"scheduler extender",kube-scheduler會(huì)調(diào)用它(http/https)作為默認(rèn)調(diào)度算法(預(yù)選&優(yōu)選&bind)的補(bǔ)充

·scheduler framework:實(shí)現(xiàn)scheduler framework plugins,重新編譯kube-scheduler,類(lèi)似于第一種方案,但是更加標(biāo)準(zhǔn)化,插件化

下面分別展開(kāi)介紹這幾種方式的原理和開(kāi)發(fā)指引。

default-scheduler recoding

這里我們先分析一下kube-scheduler調(diào)度相關(guān)入口:

設(shè)置默認(rèn)預(yù)選&優(yōu)選策略

見(jiàn)defaultPredicates以及defaultPriorities(k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults/defaults.go):

func init() {

  registerAlgorithmProvider(defaultPredicates(), defaultPriorities())

}


func defaultPredicates() sets.String {

  return sets.NewString(

    predicates.NoVolumeZoneConflictPred,

    predicates.MaxEBSVolumeCountPred,

    predicates.MaxGCEPDVolumeCountPred,

    predicates.MaxAzureDiskVolumeCountPred,

    predicates.MaxCSIVolumeCountPred,

    predicates.MatchInterPodAffinityPred,

    predicates.NoDiskConflictPred,

    predicates.GeneralPred,

    predicates.PodToleratesNodeTaintsPred,

    predicates.CheckVolumeBindingPred,

    predicates.CheckNodeUnschedulablePred,

  )

}


func defaultPriorities() sets.String {

  return sets.NewString(

    priorities.SelectorSpreadPriority,

    priorities.InterPodAffinityPriority,

    priorities.LeastRequestedPriority,

    priorities.BalancedResourceAllocation,

    priorities.NodePreferAvoidPodsPriority,

    priorities.NodeAffinityPriority,

    priorities.TaintTolerationPriority,

    priorities.ImageLocalityPriority,

  )

}


func registerAlgorithmProvider(predSet, priSet sets.String) {

  // Registers algorithm providers. By default we use 'DefaultProvider', but user can specify one to be used

  // by specifying flag.

  scheduler.RegisterAlgorithmProvider(scheduler.DefaultProvider, predSet, priSet)

  // Cluster autoscaler friendly scheduling algorithm.

  scheduler.RegisterAlgorithmProvider(ClusterAutoscalerProvider, predSet,

    copyAndReplace(priSet, priorities.LeastRequestedPriority, priorities.MostRequestedPriority))

}


const (

  // DefaultProvider defines the default algorithm provider name.

  DefaultProvider = "DefaultProvider"

)

注冊(cè)預(yù)選和優(yōu)選相關(guān)處理函數(shù)

注冊(cè)預(yù)選函數(shù)(k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults/register_predicates.go):

func init() {

    ...

  // Fit is determined by resource availability.

  // This predicate is actually a default predicate, because it is invoked from

  // predicates.GeneralPredicates()

  scheduler.RegisterFitPredicate(predicates.PodFitsResourcesPred, predicates.PodFitsResources)

}

注冊(cè)優(yōu)選函數(shù)(k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults/register_priorities.go):

func init() {

    ...

  // Prioritizes nodes that have labels matching NodeAffinity

  scheduler.RegisterPriorityMapReduceFunction(priorities.NodeAffinityPriority, priorities.CalculateNodeAffinityPriorityMap, priorities.CalculateNodeAffinityPriorityReduce, 1)

}

·編寫(xiě)預(yù)選和優(yōu)選處理函數(shù)

PodFitsResourcesPred對(duì)應(yīng)的預(yù)選函數(shù)如下(k8s.io/kubernetes/pkg/scheduler/algorithm/predicates/predicates.go):

// PodFitsResources checks if a node has sufficient resources, such as cpu, memory, gpu, opaque int resources etc to run a pod.

// First return value indicates whether a node has sufficient resources to run a pod while the second return value indicates the

// predicate failure reasons if the node has insufficient resources to run the pod.

func PodFitsResources(pod *v1.Pod, meta Metadata, nodeInfo *schedulernodeinfo.NodeInfo) (bool, []PredicateFailureReason, error) {

  node := nodeInfo.Node()

  if node == nil {

    return false, nil, fmt.Errorf("node not found")

  }


  var predicateFails []PredicateFailureReason

  allowedPodNumber := nodeInfo.AllowedPodNumber()

  if len(nodeInfo.Pods())+1 > allowedPodNumber {

    predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourcePods, 1, int64(len(nodeInfo.Pods())), int64(allowedPodNumber)))

  }


  // No extended resources should be ignored by default.

  ignoredExtendedResources := sets.NewString()


  var podRequest *schedulernodeinfo.Resource

  if predicateMeta, ok := meta.(*predicateMetadata); ok && predicateMeta.podFitsResourcesMetadata != nil {

    podRequest = predicateMeta.podFitsResourcesMetadata.podRequest

    if predicateMeta.podFitsResourcesMetadata.ignoredExtendedResources != nil {

      ignoredExtendedResources = predicateMeta.podFitsResourcesMetadata.ignoredExtendedResources

    }

  } else {

    // We couldn't parse metadata - fallback to computing it.

    podRequest = GetResourceRequest(pod)

  }

  if podRequest.MilliCPU == 0 &&

    podRequest.Memory == 0 &&

    podRequest.EphemeralStorage == 0 &&

    len(podRequest.ScalarResources) == 0 {

    return len(predicateFails) == 0, predicateFails, nil

  }


  allocatable := nodeInfo.AllocatableResource()

  if allocatable.MilliCPU < podRequest.MilliCPU+nodeInfo.RequestedResource().MilliCPU {

    predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceCPU, podRequest.MilliCPU, nodeInfo.RequestedResource().MilliCPU, allocatable.MilliCPU))

  }

  if allocatable.Memory < podRequest.Memory+nodeInfo.RequestedResource().Memory {

    predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceMemory, podRequest.Memory, nodeInfo.RequestedResource().Memory, allocatable.Memory))

  }

  if allocatable.EphemeralStorage < podRequest.EphemeralStorage+nodeInfo.RequestedResource().EphemeralStorage {

    predicateFails = append(predicateFails, NewInsufficientResourceError(v1.ResourceEphemeralStorage, podRequest.EphemeralStorage, nodeInfo.RequestedResource().EphemeralStorage, allocatable.EphemeralStorage))

  }


  for rName, rQuant := range podRequest.ScalarResources {

    if v1helper.IsExtendedResourceName(rName) {

      // If this resource is one of the extended resources that should be

      // ignored, we will skip checking it.

      if ignoredExtendedResources.Has(string(rName)) {

        continue

      }

    }

    if allocatable.ScalarResources[rName] < rQuant+nodeInfo.RequestedResource().ScalarResources[rName] {

      predicateFails = append(predicateFails, NewInsufficientResourceError(rName, podRequest.ScalarResources[rName], nodeInfo.RequestedResource().ScalarResources[rName], allocatable.ScalarResources[rName]))

    }

  }


  if klog.V(10) {

    if len(predicateFails) == 0 {

      // We explicitly don't do klog.V(10).Infof() to avoid computing all the parameters if this is

      // not logged. There is visible performance gain from it.

      klog.Infof("Schedule Pod %+v on Node %+v is allowed, Node is running only %v out of %v Pods.",

        podName(pod), node.Name, len(nodeInfo.Pods()), allowedPodNumber)

    }

  }

  return len(predicateFails) == 0, predicateFails, nil

}

優(yōu)選NodeAffinityPriority對(duì)應(yīng)的Map與Reduce函數(shù)(k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/node_affinity.go)如下:

// CalculateNodeAffinityPriorityMap prioritizes nodes according to node affinity scheduling preferences

// indicated in PreferredDuringSchedulingIgnoredDuringExecution. Each time a node matches a preferredSchedulingTerm,

// it will get an add of preferredSchedulingTerm.Weight. Thus, the more preferredSchedulingTerms

// the node satisfies and the more the preferredSchedulingTerm that is satisfied weights, the higher

// score the node gets.

func CalculateNodeAffinityPriorityMap(pod *v1.Pod, meta interface{}, nodeInfo *schedulernodeinfo.NodeInfo) (framework.NodeScore, error) {

  node := nodeInfo.Node()

  if node == nil {

    return framework.NodeScore{}, fmt.Errorf("node not found")

  }


  // default is the podspec.

  affinity := pod.Spec.Affinity

  if priorityMeta, ok := meta.(*priorityMetadata); ok {

    // We were able to parse metadata, use affinity from there.

    affinity = priorityMeta.affinity

  }


  var count int32

  // A nil element of PreferredDuringSchedulingIgnoredDuringExecution matches no objects.

  // An element of PreferredDuringSchedulingIgnoredDuringExecution that refers to an

  // empty PreferredSchedulingTerm matches all objects.

  if affinity != nil && affinity.NodeAffinity != nil && affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution != nil {

    // Match PreferredDuringSchedulingIgnoredDuringExecution term by term.

    for i := range affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution {

      preferredSchedulingTerm := &affinity.NodeAffinity.PreferredDuringSchedulingIgnoredDuringExecution[i]

      if preferredSchedulingTerm.Weight == 0 {

        continue

      }


      // TODO: Avoid computing it for all nodes if this becomes a performance problem.

      nodeSelector, err := v1helper.NodeSelectorRequirementsAsSelector(preferredSchedulingTerm.Preference.MatchExpressions)

      if err != nil {

        return framework.NodeScore{}, err

      }

      if nodeSelector.Matches(labels.Set(node.Labels)) {

        count += preferredSchedulingTerm.Weight

      }

    }

  }


  return framework.NodeScore{

    Name:  node.Name,

    Score: int64(count),

  }, nil

}


// CalculateNodeAffinityPriorityReduce is a reduce function for node affinity priority calculation.

var CalculateNodeAffinityPriorityReduce = NormalizeReduce(framework.MaxNodeScore, false)

立即登錄,閱讀全文
版權(quán)說(shuō)明:
本文內(nèi)容來(lái)自于騰訊云原生,本站不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。文章內(nèi)容系作者個(gè)人觀點(diǎn),不代表快出海對(duì)觀點(diǎn)贊同或支持。如有侵權(quán),請(qǐng)聯(lián)系管理員(zzx@kchuhai.com)刪除!
相關(guān)文章
騰訊云數(shù)據(jù)庫(kù)PostgreSQL全面支持PG 17
騰訊云數(shù)據(jù)庫(kù)PostgreSQL全面支持PG 17
即日起,騰訊云PostgreSQL全面支持PostgreSQL 17.0。所有用戶可使用大版本升級(jí)能力升級(jí)至最新的PostgreSQL 17.0進(jìn)行體驗(yàn),也可以在產(chǎn)品購(gòu)買(mǎi)頁(yè)直接購(gòu)買(mǎi)。
騰訊云
云服務(wù)
2024-12-152024-12-15
高可用這個(gè)問(wèn)題,加機(jī)器就能解決?
高可用這個(gè)問(wèn)題,加機(jī)器就能解決?
互聯(lián)網(wǎng)服務(wù)的可用性問(wèn)題是困擾企業(yè)IT人員的達(dá)摩克利斯之劍:防于未然,體現(xiàn)不出價(jià)值。已然發(fā)生,又面臨P0危機(jī)。就更別提穩(wěn)定性建設(shè)背后顯性的IT預(yù)算問(wèn)題與隱性的人員成本問(wèn)題。
騰訊云
云服務(wù)
2024-11-252024-11-25
TDSQL TDStore引擎版替換HBase:在歷史庫(kù)場(chǎng)景中的成本與性能優(yōu)勢(shì)
TDSQL TDStore引擎版替換HBase:在歷史庫(kù)場(chǎng)景中的成本與性能優(yōu)勢(shì)
HBase憑借其高可用性、高擴(kuò)展性和強(qiáng)一致性,以及在廉價(jià)PC服務(wù)器上的低部署成本,廣泛應(yīng)用于大規(guī)模數(shù)據(jù)分析。
騰訊云
云服務(wù)
2024-11-042024-11-04
復(fù)雜查詢性能弱,只讀分析引擎來(lái)幫忙
復(fù)雜查詢性能弱,只讀分析引擎來(lái)幫忙
隨著當(dāng)今業(yè)務(wù)的高速發(fā)展,復(fù)雜多表關(guān)聯(lián)的場(chǎng)景越來(lái)越普遍。但基于行式存儲(chǔ)的數(shù)據(jù)庫(kù)在進(jìn)行復(fù)雜查詢時(shí)性能相對(duì)較弱。
騰訊云
云服務(wù)
2024-11-022024-11-02
個(gè)人VIP