·相關(guān)使用
接下來(lái)我們看一下kube-scheduler調(diào)度算法(預(yù)選&優(yōu)選)是如何與上述這些操作結(jié)合起來(lái)的:
// Fit is determined by resource availability.
// This predicate is actually a default predicate, because it is invoked from
// predicates.GeneralPredicates()
scheduler.RegisterFitPredicate(predicates.PodFitsResourcesPred, predicates.PodFitsResources)
...
// RegisterFitPredicate registers a fit predicate with the algorithm
// registry. Returns the name with which the predicate was registered.
func RegisterFitPredicate(name string, predicate predicates.FitPredicate) string {
return RegisterFitPredicateFactory(name, func(AlgorithmFactoryArgs) predicates.FitPredicate { return predicate })
}
...
// RegisterFitPredicateFactory registers a fit predicate factory with the
// algorithm registry. Returns the name with which the predicate was registered.
func RegisterFitPredicateFactory(name string, predicateFactory FitPredicateFactory) string {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
validateAlgorithmNameOrDie(name)
fitPredicateMap[name] = predicateFactory
return name
}
...
// Prioritizes nodes that have labels matching NodeAffinity
scheduler.RegisterPriorityMapReduceFunction(priorities.NodeAffinityPriority, priorities.CalculateNodeAffinityPriorityMap, priorities.CalculateNodeAffinityPriorityReduce, 1)
...
// RegisterPriorityMapReduceFunction registers a priority function with the algorithm registry. Returns the name,
// with which the function was registered.
func RegisterPriorityMapReduceFunction(
name string,
mapFunction priorities.PriorityMapFunction,
reduceFunction priorities.PriorityReduceFunction,
weight int) string {
return RegisterPriorityConfigFactory(name, PriorityConfigFactory{
MapReduceFunction: func(AlgorithmFactoryArgs) (priorities.PriorityMapFunction, priorities.PriorityReduceFunction) {
return mapFunction, reduceFunction
},
Weight: int64(weight),
})
}
...
// RegisterPriorityConfigFactory registers a priority config factory with its name.
func RegisterPriorityConfigFactory(name string, pcf PriorityConfigFactory) string {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
validateAlgorithmNameOrDie(name)
priorityFunctionMap[name] = pcf
return name
}
...
// (g.predicates)
// podFitsOnNode checks whether a node given by NodeInfo satisfies the given predicate functions.
// For given pod, podFitsOnNode will check if any equivalent pod exists and try to reuse its cached
// predicate results as possible.
// This function is called from two different places: Schedule and Preempt.
// When it is called from Schedule, we want to test whether the pod is schedulable
// on the node with all the existing pods on the node plus higher and equal priority
// pods nominated to run on the node.
// When it is called from Preempt, we should remove the victims of preemption and
// add the nominated pods. Removal of the victims is done by SelectVictimsOnNode().
// It removes victims from meta and NodeInfo before calling this function.
func (g *genericScheduler) podFitsOnNode(
ctx context.Context,
state *framework.CycleState,
pod *v1.Pod,
meta predicates.Metadata,
info *schedulernodeinfo.NodeInfo,
alwaysCheckAllPredicates bool,
) (bool, []predicates.PredicateFailureReason, *framework.Status, error) {
var failedPredicates []predicates.PredicateFailureReason
var status *framework.Status
podsAdded := false
// We run predicates twice in some cases. If the node has greater or equal priority
// nominated pods, we run them when those pods are added to meta and nodeInfo.
// If all predicates succeed in this pass, we run them again when these
// nominated pods are not added. This second pass is necessary because some
// predicates such as inter-pod affinity may not pass without the nominated pods.
// If there are no nominated pods for the node or if the first run of the
// predicates fail, we don't run the second pass.
// We consider only equal or higher priority pods in the first pass, because
// those are the current "pod" must yield to them and not take a space opened
// for running them. It is ok if the current "pod" take resources freed for
// lower priority pods.
// Requiring that the new pod is schedulable in both circumstances ensures that
// we are making a conservative decision: predicates like resources and inter-pod
// anti-affinity are more likely to fail when the nominated pods are treated
// as running, while predicates like pod affinity are more likely to fail when
// the nominated pods are treated as not running. We can't just assume the
// nominated pods are running because they are not running right now and in fact,
// they may end up getting scheduled to a different node.
for i := 0; i < 2; i++ {
...
for _, predicateKey := range predicates.Ordering() {
...
if predicate, exist := g.predicates[predicateKey]; exist {
fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
if err != nil {
return false, []predicates.PredicateFailureReason{}, nil, err
}
...
}
}
}
return len(failedPredicates) == 0 && status.IsSuccess(), failedPredicates, status, nil
}
...
// (g.prioritizers)
// prioritizeNodes prioritizes the nodes by running the individual priority functions in parallel.
// Each priority function is expected to set a score of 0-10
// 0 is the lowest priority score (least preferred node) and 10 is the highest
// Each priority function can also have its own weight
// The node scores returned by the priority function are multiplied by the weights to get weighted scores
// All scores are finally combined (added) to get the total weighted scores of all nodes
func (g *genericScheduler) prioritizeNodes(
ctx context.Context,
state *framework.CycleState,
pod *v1.Pod,
meta interface{},
nodes []*v1.Node,
) (framework.NodeScoreList, error) {
workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
nodeInfo := g.nodeInfoSnapshot.NodeInfoMap[nodes[index].Name]
for i := range g.prioritizers {
var err error
results[i][index], err = g.prioritizers[i].Map(pod, meta, nodeInfo)
if err != nil {
appendError(err)
results[i][index].Name = nodes[index].Name
}
}
})
for i := range g.prioritizers {
if g.prioritizers[i].Reduce == nil {
continue
}
wg.Add(1)
go func(index int) {
metrics.SchedulerGoroutines.WithLabelValues("prioritizing_mapreduce").Inc()
defer func() {
metrics.SchedulerGoroutines.WithLabelValues("prioritizing_mapreduce").Dec()
wg.Done()
}()
if err := g.prioritizers[index].Reduce(pod, meta, g.nodeInfoSnapshot, results[index]); err != nil {
appendError(err)
}
if klog.V(10) {
for _, hostPriority := range results[index] {
klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Name, g.prioritizers[index].Name, hostPriority.Score)
}
}
}(i)
}
// Wait for all computations to be finished.
wg.Wait()
...
}
綜上,如果要在kube-scheduler基礎(chǔ)上添加策略,則按照如下步驟進(jìn)行添加:
·設(shè)置默認(rèn)預(yù)選&優(yōu)選策略:defaultPredicates以及defaultPriorities(k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults/defaults.go)
·注冊(cè)預(yù)選和優(yōu)選相關(guān)處理函數(shù):注冊(cè)預(yù)選函數(shù)(k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults/register_predicates.go);注冊(cè)優(yōu)選函數(shù)(k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults/register_priorities.go)
·編寫(xiě)預(yù)選和優(yōu)選處理函數(shù):編寫(xiě)預(yù)選函數(shù)(k8s.io/kubernetes/pkg/scheduler/algorithm/predicates/predicates.go);編寫(xiě)優(yōu)選函數(shù)Map+Reduce(k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/xxx.go)
·除了默認(rèn)設(shè)置預(yù)選&優(yōu)選外,還可以手動(dòng)通過(guò)命令行--policy-config-file指定調(diào)度策略(會(huì)覆蓋默認(rèn)策略),例如examples/scheduler-policy-config.json
standalone
相比recoding只修改簡(jiǎn)單代碼,standalone在kube-scheduler基礎(chǔ)上進(jìn)行重度二次定制,這種方式優(yōu)缺點(diǎn)如下:
·Pros
·滿(mǎn)足對(duì)scheduler最大程度的重構(gòu)&定制
·Cons
·實(shí)際工程中如果只是想添加預(yù)選或者優(yōu)選策略,則會(huì)切換到第一種方案,不會(huì)單獨(dú)開(kāi)發(fā)和部署一個(gè)scheduler
·二次定制scheduler開(kāi)發(fā)難度較大(至少對(duì)scheduler代碼非常熟悉),且對(duì)Kubernetes集群影響較大(無(wú)論是單獨(dú)部署,還是并列部署),后續(xù)升級(jí)和維護(hù)成本較高
·可能會(huì)產(chǎn)生調(diào)度沖突問(wèn)題,在同時(shí)部署兩個(gè)scheduler時(shí),可能會(huì)出現(xiàn)一個(gè)scheduler bind的時(shí)候?qū)嶋H資源已經(jīng)被另一個(gè)scheduler分配了
因此建議在其它方案滿(mǎn)足不了擴(kuò)展需求時(shí),才采用standalone方案,且生產(chǎn)環(huán)境僅部署一個(gè)scheduler。
scheduler extender
對(duì)于Kubernetes項(xiàng)目來(lái)說(shuō),它很樂(lè)意開(kāi)發(fā)者使用并向它提bug或者PR(受歡迎),但是不建議開(kāi)發(fā)者為了實(shí)現(xiàn)業(yè)務(wù)需求直接修改Kubernetes核心代碼,因?yàn)檫@樣做會(huì)影響Kubernetes本身的代碼質(zhì)量以及穩(wěn)定性。因此Kubernetes希望盡可能通過(guò)外圍的方式來(lái)解決客戶(hù)自定義的需求。
其實(shí)任何好的項(xiàng)目都應(yīng)該這樣思考:盡可能抽取核心代碼,這部分代碼不應(yīng)該經(jīng)常變動(dòng)或者說(shuō)只能由maintainer改動(dòng)(提高代碼質(zhì)量,減小項(xiàng)目本身開(kāi)發(fā)&運(yùn)維成本);將第三方客戶(hù)需求盡可能提取到外圍解決(滿(mǎn)足客戶(hù)自由),例如:插件的形式(eg:CNI,CRI,CSI and scheduler framework etc)。
上面介紹的default-scheduler recoding以及standalone方案都屬于侵入式的方案,不太優(yōu)雅;而scheduler extender以及scheduler framework屬于非侵入式的方案,這里重點(diǎn)介紹scheduler extender。
scheduler extender類(lèi)似于webhook,kube-scheduler會(huì)在默認(rèn)調(diào)度算法執(zhí)行完成后以http/https的方式調(diào)用extender,extender server完成自定義的預(yù)選&優(yōu)選邏輯,并返回規(guī)定字段給scheduler,scheduler結(jié)合這些信息進(jìn)行最終的調(diào)度裁決,從而完成基于extender實(shí)現(xiàn)擴(kuò)展的邏輯。
scheduler extender適用于調(diào)度策略與非標(biāo)準(zhǔn)kube-scheduler管理資源相關(guān)的場(chǎng)景,當(dāng)然你也可以使用extender完成與上述兩種方式同樣的功能。
下面我們結(jié)合代碼說(shuō)明extender的使用原理: