·相關(guān)使用
接下來我們看一下kube-scheduler調(diào)度算法(預(yù)選&優(yōu)選)是如何與上述這些操作結(jié)合起來的:
// Fit is determined by resource availability.
// This predicate is actually a default predicate, because it is invoked from
// predicates.GeneralPredicates()
scheduler.RegisterFitPredicate(predicates.PodFitsResourcesPred, predicates.PodFitsResources)
...
// RegisterFitPredicate registers a fit predicate with the algorithm
// registry. Returns the name with which the predicate was registered.
func RegisterFitPredicate(name string, predicate predicates.FitPredicate) string {
return RegisterFitPredicateFactory(name, func(AlgorithmFactoryArgs) predicates.FitPredicate { return predicate })
}
...
// RegisterFitPredicateFactory registers a fit predicate factory with the
// algorithm registry. Returns the name with which the predicate was registered.
func RegisterFitPredicateFactory(name string, predicateFactory FitPredicateFactory) string {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
validateAlgorithmNameOrDie(name)
fitPredicateMap[name] = predicateFactory
return name
}
...
// Prioritizes nodes that have labels matching NodeAffinity
scheduler.RegisterPriorityMapReduceFunction(priorities.NodeAffinityPriority, priorities.CalculateNodeAffinityPriorityMap, priorities.CalculateNodeAffinityPriorityReduce, 1)
...
// RegisterPriorityMapReduceFunction registers a priority function with the algorithm registry. Returns the name,
// with which the function was registered.
func RegisterPriorityMapReduceFunction(
name string,
mapFunction priorities.PriorityMapFunction,
reduceFunction priorities.PriorityReduceFunction,
weight int) string {
return RegisterPriorityConfigFactory(name, PriorityConfigFactory{
MapReduceFunction: func(AlgorithmFactoryArgs) (priorities.PriorityMapFunction, priorities.PriorityReduceFunction) {
return mapFunction, reduceFunction
},
Weight: int64(weight),
})
}
...
// RegisterPriorityConfigFactory registers a priority config factory with its name.
func RegisterPriorityConfigFactory(name string, pcf PriorityConfigFactory) string {
schedulerFactoryMutex.Lock()
defer schedulerFactoryMutex.Unlock()
validateAlgorithmNameOrDie(name)
priorityFunctionMap[name] = pcf
return name
}
...
// (g.predicates)
// podFitsOnNode checks whether a node given by NodeInfo satisfies the given predicate functions.
// For given pod, podFitsOnNode will check if any equivalent pod exists and try to reuse its cached
// predicate results as possible.
// This function is called from two different places: Schedule and Preempt.
// When it is called from Schedule, we want to test whether the pod is schedulable
// on the node with all the existing pods on the node plus higher and equal priority
// pods nominated to run on the node.
// When it is called from Preempt, we should remove the victims of preemption and
// add the nominated pods. Removal of the victims is done by SelectVictimsOnNode().
// It removes victims from meta and NodeInfo before calling this function.
func (g *genericScheduler) podFitsOnNode(
ctx context.Context,
state *framework.CycleState,
pod *v1.Pod,
meta predicates.Metadata,
info *schedulernodeinfo.NodeInfo,
alwaysCheckAllPredicates bool,
) (bool, []predicates.PredicateFailureReason, *framework.Status, error) {
var failedPredicates []predicates.PredicateFailureReason
var status *framework.Status
podsAdded := false
// We run predicates twice in some cases. If the node has greater or equal priority
// nominated pods, we run them when those pods are added to meta and nodeInfo.
// If all predicates succeed in this pass, we run them again when these
// nominated pods are not added. This second pass is necessary because some
// predicates such as inter-pod affinity may not pass without the nominated pods.
// If there are no nominated pods for the node or if the first run of the
// predicates fail, we don't run the second pass.
// We consider only equal or higher priority pods in the first pass, because
// those are the current "pod" must yield to them and not take a space opened
// for running them. It is ok if the current "pod" take resources freed for
// lower priority pods.
// Requiring that the new pod is schedulable in both circumstances ensures that
// we are making a conservative decision: predicates like resources and inter-pod
// anti-affinity are more likely to fail when the nominated pods are treated
// as running, while predicates like pod affinity are more likely to fail when
// the nominated pods are treated as not running. We can't just assume the
// nominated pods are running because they are not running right now and in fact,
// they may end up getting scheduled to a different node.
for i := 0; i < 2; i++ {
...
for _, predicateKey := range predicates.Ordering() {
...
if predicate, exist := g.predicates[predicateKey]; exist {
fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
if err != nil {
return false, []predicates.PredicateFailureReason{}, nil, err
}
...
}
}
}
return len(failedPredicates) == 0 && status.IsSuccess(), failedPredicates, status, nil
}
...
// (g.prioritizers)
// prioritizeNodes prioritizes the nodes by running the individual priority functions in parallel.
// Each priority function is expected to set a score of 0-10
// 0 is the lowest priority score (least preferred node) and 10 is the highest
// Each priority function can also have its own weight
// The node scores returned by the priority function are multiplied by the weights to get weighted scores
// All scores are finally combined (added) to get the total weighted scores of all nodes
func (g *genericScheduler) prioritizeNodes(
ctx context.Context,
state *framework.CycleState,
pod *v1.Pod,
meta interface{},
nodes []*v1.Node,
) (framework.NodeScoreList, error) {
workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
nodeInfo := g.nodeInfoSnapshot.NodeInfoMap[nodes[index].Name]
for i := range g.prioritizers {
var err error
results[i][index], err = g.prioritizers[i].Map(pod, meta, nodeInfo)
if err != nil {
appendError(err)
results[i][index].Name = nodes[index].Name
}
}
})
for i := range g.prioritizers {
if g.prioritizers[i].Reduce == nil {
continue
}
wg.Add(1)
go func(index int) {
metrics.SchedulerGoroutines.WithLabelValues("prioritizing_mapreduce").Inc()
defer func() {
metrics.SchedulerGoroutines.WithLabelValues("prioritizing_mapreduce").Dec()
wg.Done()
}()
if err := g.prioritizers[index].Reduce(pod, meta, g.nodeInfoSnapshot, results[index]); err != nil {
appendError(err)
}
if klog.V(10) {
for _, hostPriority := range results[index] {
klog.Infof("%v -> %v: %v, Score: (%d)", util.GetPodFullName(pod), hostPriority.Name, g.prioritizers[index].Name, hostPriority.Score)
}
}
}(i)
}
// Wait for all computations to be finished.
wg.Wait()
...
}
綜上,如果要在kube-scheduler基礎(chǔ)上添加策略,則按照如下步驟進(jìn)行添加:
·設(shè)置默認(rèn)預(yù)選&優(yōu)選策略:defaultPredicates以及defaultPriorities(k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults/defaults.go)
·注冊預(yù)選和優(yōu)選相關(guān)處理函數(shù):注冊預(yù)選函數(shù)(k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults/register_predicates.go);注冊優(yōu)選函數(shù)(k8s.io/kubernetes/pkg/scheduler/algorithmprovider/defaults/register_priorities.go)
·編寫預(yù)選和優(yōu)選處理函數(shù):編寫預(yù)選函數(shù)(k8s.io/kubernetes/pkg/scheduler/algorithm/predicates/predicates.go);編寫優(yōu)選函數(shù)Map+Reduce(k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/xxx.go)
·除了默認(rèn)設(shè)置預(yù)選&優(yōu)選外,還可以手動通過命令行--policy-config-file指定調(diào)度策略(會覆蓋默認(rèn)策略),例如examples/scheduler-policy-config.json
standalone
相比recoding只修改簡單代碼,standalone在kube-scheduler基礎(chǔ)上進(jìn)行重度二次定制,這種方式優(yōu)缺點如下:
·Pros
·滿足對scheduler最大程度的重構(gòu)&定制
·Cons
·實際工程中如果只是想添加預(yù)選或者優(yōu)選策略,則會切換到第一種方案,不會單獨開發(fā)和部署一個scheduler
·二次定制scheduler開發(fā)難度較大(至少對scheduler代碼非常熟悉),且對Kubernetes集群影響較大(無論是單獨部署,還是并列部署),后續(xù)升級和維護(hù)成本較高
·可能會產(chǎn)生調(diào)度沖突問題,在同時部署兩個scheduler時,可能會出現(xiàn)一個scheduler bind的時候?qū)嶋H資源已經(jīng)被另一個scheduler分配了
因此建議在其它方案滿足不了擴(kuò)展需求時,才采用standalone方案,且生產(chǎn)環(huán)境僅部署一個scheduler。
scheduler extender
對于Kubernetes項目來說,它很樂意開發(fā)者使用并向它提bug或者PR(受歡迎),但是不建議開發(fā)者為了實現(xiàn)業(yè)務(wù)需求直接修改Kubernetes核心代碼,因為這樣做會影響Kubernetes本身的代碼質(zhì)量以及穩(wěn)定性。因此Kubernetes希望盡可能通過外圍的方式來解決客戶自定義的需求。
其實任何好的項目都應(yīng)該這樣思考:盡可能抽取核心代碼,這部分代碼不應(yīng)該經(jīng)常變動或者說只能由maintainer改動(提高代碼質(zhì)量,減小項目本身開發(fā)&運維成本);將第三方客戶需求盡可能提取到外圍解決(滿足客戶自由),例如:插件的形式(eg:CNI,CRI,CSI and scheduler framework etc)。
上面介紹的default-scheduler recoding以及standalone方案都屬于侵入式的方案,不太優(yōu)雅;而scheduler extender以及scheduler framework屬于非侵入式的方案,這里重點介紹scheduler extender。
scheduler extender類似于webhook,kube-scheduler會在默認(rèn)調(diào)度算法執(zhí)行完成后以http/https的方式調(diào)用extender,extender server完成自定義的預(yù)選&優(yōu)選邏輯,并返回規(guī)定字段給scheduler,scheduler結(jié)合這些信息進(jìn)行最終的調(diào)度裁決,從而完成基于extender實現(xiàn)擴(kuò)展的邏輯。
scheduler extender適用于調(diào)度策略與非標(biāo)準(zhǔn)kube-scheduler管理資源相關(guān)的場景,當(dāng)然你也可以使用extender完成與上述兩種方式同樣的功能。
下面我們結(jié)合代碼說明extender的使用原理: