騰訊云帶你一篇讀懂Kubernetes Scheduler擴(kuò)展功能(二)

來源: 騰訊云原生
作者:小小小小杜
時間:2020-11-03
17219
本文介紹Kubernetes Scheduler擴(kuò)展功能。

adver20200824171628.jpg

·相關(guān)使用

接下來我們看一下kube-scheduler調(diào)度算法(預(yù)選&優(yōu)選)是如何與上述這些操作結(jié)合起來的:

// Fit is determined by resource availability.

// This predicate is actually a default predicate, because it is invoked from

// predicates.GeneralPredicates()

scheduler.RegisterFitPredicate(predicates.PodFitsResourcesPred, predicates.PodFitsResources)


...

// RegisterFitPredicate registers a fit predicate with the algorithm

// registry. Returns the name with which the predicate was registered.

func RegisterFitPredicate(name string, predicate predicates.FitPredicate) string {

  return RegisterFitPredicateFactory(name, func(AlgorithmFactoryArgs) predicates.FitPredicate { return predicate })

}


...

// RegisterFitPredicateFactory registers a fit predicate factory with the

// algorithm registry. Returns the name with which the predicate was registered.

func RegisterFitPredicateFactory(name string, predicateFactory FitPredicateFactory) string {

  schedulerFactoryMutex.Lock()

  defer schedulerFactoryMutex.Unlock()

  validateAlgorithmNameOrDie(name)

  fitPredicateMap[name] = predicateFactory

  return name

}


...

// Prioritizes nodes that have labels matching NodeAffinity

scheduler.RegisterPriorityMapReduceFunction(priorities.NodeAffinityPriority, priorities.CalculateNodeAffinityPriorityMap, priorities.CalculateNodeAffinityPriorityReduce, 1)


...

// RegisterPriorityMapReduceFunction registers a priority function with the algorithm registry. Returns the name,

// with which the function was registered.

func RegisterPriorityMapReduceFunction(

  name string,

  mapFunction priorities.PriorityMapFunction,

  reduceFunction priorities.PriorityReduceFunction,

  weight int) string {

  return RegisterPriorityConfigFactory(name, PriorityConfigFactory{

    MapReduceFunction: func(AlgorithmFactoryArgs) (priorities.PriorityMapFunction, priorities.PriorityReduceFunction) {

      return mapFunction, reduceFunction

    },

    Weight: int64(weight),

  })

}


...

// RegisterPriorityConfigFactory registers a priority config factory with its name.

func RegisterPriorityConfigFactory(name string, pcf PriorityConfigFactory) string {

  schedulerFactoryMutex.Lock()

  defer schedulerFactoryMutex.Unlock()

  validateAlgorithmNameOrDie(name)

  priorityFunctionMap[name] = pcf

  return name

}


...

// (g.predicates)

// podFitsOnNode checks whether a node given by NodeInfo satisfies the given predicate functions.

// For given pod, podFitsOnNode will check if any equivalent pod exists and try to reuse its cached

// predicate results as possible.

// This function is called from two different places: Schedule and Preempt.

// When it is called from Schedule, we want to test whether the pod is schedulable

// on the node with all the existing pods on the node plus higher and equal priority

// pods nominated to run on the node.

// When it is called from Preempt, we should remove the victims of preemption and

// add the nominated pods. Removal of the victims is done by SelectVictimsOnNode().

// It removes victims from meta and NodeInfo before calling this function.

func (g *genericScheduler) podFitsOnNode(

  ctx context.Context,

  state *framework.CycleState,

  pod *v1.Pod,

  meta predicates.Metadata,

  info *schedulernodeinfo.NodeInfo,

  alwaysCheckAllPredicates bool,

) (bool, []predicates.PredicateFailureReason, *framework.Status, error) {

  var failedPredicates []predicates.PredicateFailureReason

  var status *framework.Status


  podsAdded := false

  // We run predicates twice in some cases. If the node has greater or equal priority

  // nominated pods, we run them when those pods are added to meta and nodeInfo.

  // If all predicates succeed in this pass, we run them again when these

  // nominated pods are not added. This second pass is necessary because some

  // predicates such as inter-pod affinity may not pass without the nominated pods.

  // If there are no nominated pods for the node or if the first run of the

  // predicates fail, we don't run the second pass.

  // We consider only equal or higher priority pods in the first pass, because

  // those are the current "pod" must yield to them and not take a space opened

  // for running them. It is ok if the current "pod" take resources freed for

  // lower priority pods.

  // Requiring that the new pod is schedulable in both circumstances ensures that

  // we are making a conservative decision: predicates like resources and inter-pod

  // anti-affinity are more likely to fail when the nominated pods are treated

  // as running, while predicates like pod affinity are more likely to fail when

  // the nominated pods are treated as not running. We can't just assume the

  // nominated pods are running because they are not running right now and in fact,

  // they may end up getting scheduled to a different node.

  for i := 0; i < 2; i++ {

        ...

    for _, predicateKey := range predicates.Ordering() {

            ...

      if predicate, exist := g.predicates[predicateKey]; exist {

        fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)

        if err != nil {

          return false, []predicates.PredicateFailureReason{}, nil, err

        }

                ...

      }

    }


  }


  return len(failedPredicates) == 0 && status.IsSuccess(), failedPredicates, status, nil

}


...

// (g.prioritizers)

// prioritizeNodes prioritizes the nodes by running the individual priority functions in parallel.

// Each priority function is expected to set a score of 0-10

// 0 is the lowest priority score (least preferred node) and 10 is the highest

// Each priority function can also have its own weight

// The node scores returned by the priority function are multiplied by the weights to get weighted scores

// All scores are finally combined (added) to get the total weighted scores of all nodes

func (g *genericScheduler) prioritizeNodes(

  ctx context.Context,

  state *framework.CycleState,

  pod *v1.Pod,

  meta interface{},

  nodes []*v1.Node,

) (framework.NodeScoreList, error) {


  workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {

    nodeInfo := g.nodeInfoSnapshot.NodeInfoMap[nodes[index].Name]

    for i := range g.prioritizers {

      var err error

      results[i][index], err = g.prioritizers[i].Map(pod, meta, nodeInfo)

      if err != nil {

        appendError(err)

        results[i][index].Name = nodes[index].Name

      }

    }

  })


  for i := range g.prioritizers {

    if g.prioritizers[i].Reduce == nil {

      continue

    }

    wg.Add(1)

    go func(index int) {

      metrics.SchedulerGoroutines.WithLabelValues("prioritizing_mapreduce").Inc()

      defer func() {

        metrics.SchedulerGoroutines.WithLabelValues("prioritizing_mapreduce").Dec()

        wg.Done()

      }()

      if err := g.prioritizers[index].Reduce(pod, meta, g.nodeInfoSnapshot, results[index]); err != nil {

        appendError(err)

      }

      if klog.V(10) {

        for _, hostPriority := range results[index] {

          klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Name, g.prioritizers[index].Name, hostPriority.Score)

        }

      }

    }(i)

  }

  // Wait for all computations to be finished.

  wg.Wait()

    ...

}

綜上,如果要在kube-scheduler基礎(chǔ)上添加策略,則按照如下步驟進(jìn)行添加:

·設(shè)置默認(rèn)預(yù)選&優(yōu)選策略:defaultPredicates以及defaultPriorities(k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults/defaults.go)

·注冊預(yù)選和優(yōu)選相關(guān)處理函數(shù):注冊預(yù)選函數(shù)(k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults/register_predicates.go);注冊優(yōu)選函數(shù)(k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults/register_priorities.go)

·編寫預(yù)選和優(yōu)選處理函數(shù):編寫預(yù)選函數(shù)(k8s.io/kubernetes/pkg/scheduler/algorithm/predicates/predicates.go);編寫優(yōu)選函數(shù)Map+Reduce(k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/xxx.go)

·除了默認(rèn)設(shè)置預(yù)選&優(yōu)選外,還可以手動通過命令行--policy-config-file指定調(diào)度策略(會覆蓋默認(rèn)策略),例如examples/scheduler-policy-config.json

standalone

相比recoding只修改簡單代碼,standalone在kube-scheduler基礎(chǔ)上進(jìn)行重度二次定制,這種方式優(yōu)缺點如下:

·Pros

  ·滿足對scheduler最大程度的重構(gòu)&定制

·Cons

  ·實際工程中如果只是想添加預(yù)選或者優(yōu)選策略,則會切換到第一種方案,不會單獨開發(fā)和部署一個scheduler

  ·二次定制scheduler開發(fā)難度較大(至少對scheduler代碼非常熟悉),且對Kubernetes集群影響較大(無論是單獨部署,還是并列部署),后續(xù)升級和維護(hù)成本較高

  ·可能會產(chǎn)生調(diào)度沖突問題,在同時部署兩個scheduler時,可能會出現(xiàn)一個scheduler bind的時候?qū)嶋H資源已經(jīng)被另一個scheduler分配了

因此建議在其它方案滿足不了擴(kuò)展需求時,才采用standalone方案,且生產(chǎn)環(huán)境僅部署一個scheduler。

scheduler extender

對于Kubernetes項目來說,它很樂意開發(fā)者使用并向它提bug或者PR(受歡迎),但是不建議開發(fā)者為了實現(xiàn)業(yè)務(wù)需求直接修改Kubernetes核心代碼,因為這樣做會影響Kubernetes本身的代碼質(zhì)量以及穩(wěn)定性。因此Kubernetes希望盡可能通過外圍的方式來解決客戶自定義的需求。

其實任何好的項目都應(yīng)該這樣思考:盡可能抽取核心代碼,這部分代碼不應(yīng)該經(jīng)常變動或者說只能由maintainer改動(提高代碼質(zhì)量,減小項目本身開發(fā)&運維成本);將第三方客戶需求盡可能提取到外圍解決(滿足客戶自由),例如:插件的形式(eg:CNI,CRI,CSI and scheduler framework etc)。

上面介紹的default-scheduler recoding以及standalone方案都屬于侵入式的方案,不太優(yōu)雅;而scheduler extender以及scheduler framework屬于非侵入式的方案,這里重點介紹scheduler extender。

scheduler extender類似于webhook,kube-scheduler會在默認(rèn)調(diào)度算法執(zhí)行完成后以http/https的方式調(diào)用extender,extender server完成自定義的預(yù)選&優(yōu)選邏輯,并返回規(guī)定字段給scheduler,scheduler結(jié)合這些信息進(jìn)行最終的調(diào)度裁決,從而完成基于extender實現(xiàn)擴(kuò)展的邏輯。

scheduler extender適用于調(diào)度策略與非標(biāo)準(zhǔn)kube-scheduler管理資源相關(guān)的場景,當(dāng)然你也可以使用extender完成與上述兩種方式同樣的功能。

下面我們結(jié)合代碼說明extender的使用原理:

640.webp.jpg

立即登錄,閱讀全文
版權(quán)說明:
本文內(nèi)容來自于騰訊云原生,本站不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。文章內(nèi)容系作者個人觀點,不代表快出海對觀點贊同或支持。如有侵權(quán),請聯(lián)系管理員(zzx@kchuhai.com)刪除!
優(yōu)質(zhì)服務(wù)商推薦
更多