Unverified Commit 9e699175 authored by WuLei's avatar WuLei Committed by GitHub
Browse files

Merge pull request #1354 from huone1/tainttoleration-0.4

support taint toleration preferNoScheduler in release-0.4
No related merge requests found
Showing with 91 additions and 7 deletions
+91 -7
......@@ -17,9 +17,13 @@ limitations under the License.
package nodeorder
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/scheduler/algorithm/priorities"
schedulerapi "k8s.io/kubernetes/pkg/scheduler/api"
......@@ -42,6 +46,8 @@ const (
LeastRequestedWeight = "leastrequested.weight"
// BalancedResourceWeight is the key for providing Balanced Resource Priority Weight in YAML
BalancedResourceWeight = "balancedresource.weight"
// TaintTolerationWeight is the key for providing Most Requested Priority Weight in YAML
TaintTolerationWeight = "tainttoleration.weight"
)
type nodeOrderPlugin struct {
......@@ -63,6 +69,7 @@ type priorityWeight struct {
nodeAffinityWeight int
podAffinityWeight int
balancedRescourceWeight int
taintTolerationWeight int
}
func calculateWeight(args framework.Arguments) priorityWeight {
......@@ -86,6 +93,7 @@ func calculateWeight(args framework.Arguments) priorityWeight {
podaffinity.weight: 2
leastrequested.weight: 2
balancedresource.weight: 2
tainttoleration.weight: 2
*/
// Values are initialized to 1.
......@@ -94,6 +102,7 @@ func calculateWeight(args framework.Arguments) priorityWeight {
nodeAffinityWeight: 1,
podAffinityWeight: 1,
balancedRescourceWeight: 1,
taintTolerationWeight: 1,
}
// Checks whether nodeaffinity.weight is provided or not, if given, modifies the value in weight struct.
......@@ -108,6 +117,9 @@ func calculateWeight(args framework.Arguments) priorityWeight {
// Checks whether balancedresource.weight is provided or not, if given, modifies the value in weight struct.
args.GetInt(&weight.balancedRescourceWeight, BalancedResourceWeight)
// Checks whether tainttoleration.weight is provided or not, if given, modifies the value in weight struct.
args.GetInt(&weight.taintTolerationWeight, TaintTolerationWeight)
return weight
}
......@@ -195,18 +207,21 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) {
ssn.AddNodeOrderFn(pp.Name(), nodeOrderFn)
batchNodeOrderFn := func(task *api.TaskInfo, nodes []*api.NodeInfo) (map[string]float64, error) {
var interPodAffinityScore schedulerapi.HostPriorityList
PodAffinityScore, err := interPodAffinityScore(task.Pod, cn, nodeMap, nodeSlice, weight.podAffinityWeight)
if err != nil {
klog.Warningf("inter pod affinity score failed, Error: %v", err)
return nil, err
}
mapFn := priorities.NewInterPodAffinityPriority(cn, v1.DefaultHardPodAffinitySymmetricWeight)
interPodAffinityScore, err := mapFn(task.Pod, nodeMap, nodeSlice)
TolerationScores, err := taintTolerationScore(task.Pod, nodes, nodeMap, weight.taintTolerationWeight)
if err != nil {
klog.Warningf("Calculate Inter Pod Affinity Priority Failed because of Error: %v", err)
klog.Warningf("taint toleration score failed, Error: %v", err)
return nil, err
}
score := make(map[string]float64, len(interPodAffinityScore))
for _, host := range interPodAffinityScore {
score[host.Host] = float64(host.Score) * float64(weight.podAffinityWeight)
score := make(map[string]float64, len(nodes))
for _, node := range nodes {
score[node.Name] = TolerationScores[node.Name] + PodAffinityScore[node.Name]
}
klog.V(4).Infof("Batch Total Score for task %s/%s is: %v", task.Namespace, task.Name, score)
......@@ -215,6 +230,75 @@ func (pp *nodeOrderPlugin) OnSessionOpen(ssn *framework.Session) {
ssn.AddBatchNodeOrderFn(pp.Name(), batchNodeOrderFn)
}
func interPodAffinityScore(
pod *v1.Pod,
cn *cachedNodeInfo,
nodeMap map[string]*schedulernodeinfo.NodeInfo,
nodeSlice []*v1.Node,
weight int,
) (map[string]float64, error) {
var interPodAffinityScore schedulerapi.HostPriorityList
mapFn := priorities.NewInterPodAffinityPriority(cn, v1.DefaultHardPodAffinitySymmetricWeight)
interPodAffinityScore, err := mapFn(pod, nodeMap, nodeSlice)
if err != nil {
klog.Warningf("Calculate Inter Pod Affinity Priority Failed because of Error: %v", err)
return nil, err
}
score := make(map[string]float64, len(interPodAffinityScore))
for _, host := range interPodAffinityScore {
score[host.Host] = float64(host.Score) * float64(weight)
}
klog.V(4).Infof("inter pod affinity Score for pod %s/%s is: %v", pod.Namespace, pod.Name, score)
return score, nil
}
func taintTolerationScore(
pod *v1.Pod,
candidateNodes []*api.NodeInfo,
nodeMap map[string]*schedulernodeinfo.NodeInfo,
weight int,
) (map[string]float64, error) {
nodes := make([]*schedulernodeinfo.NodeInfo, 0, len(candidateNodes))
for _, node := range candidateNodes {
nodes = append(nodes, nodeMap[node.Name])
}
result := make(schedulerapi.HostPriorityList, 0, len(nodes))
errCh := make(chan error, 1)
workqueue.ParallelizeUntil(context.TODO(), 16, len(nodes), func(index int) {
score, err := priorities.ComputeTaintTolerationPriorityMap(pod, nil, nodes[index])
if err != nil {
errCh <- fmt.Errorf("calculate taint toleration priority failed %v", err)
return
}
result = append(result, score)
})
select {
case err := <-errCh:
return nil, err
default:
}
err := priorities.ComputeTaintTolerationPriorityReduce(pod, nil, nil, result)
if err != nil {
return nil, fmt.Errorf("ComputeTaintTolerationPriorityReduce %v", err)
}
score := make(map[string]float64, len(result))
for _, host := range result {
score[host.Host] = float64(host.Score) * float64(weight)
}
klog.V(4).Infof("taint toleration score for pod %s/%s is: %v", pod.Namespace, pod.Name, score)
return score, nil
}
func (pp *nodeOrderPlugin) OnSessionClose(ssn *framework.Session) {
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment