Unverified Commit 0de7d7be authored by stormgbs's avatar stormgbs Committed by GitHub
Browse files

QoS Manager Framework (#590)

Signed-off-by: default avatarstormgbs <stormgbs@gmail.com>
Showing with 1120 additions and 0 deletions
+1120 -0
......@@ -68,6 +68,12 @@ func main() {
os.Exit(1)
}
// Init config from ConfigMap.
if err = cfg.InitFromConfigMap(); err != nil {
klog.Error("Unable to init config from ConfigMap: ", err)
os.Exit(1)
}
d, err := agent.NewDaemon(cfg)
if err != nil {
klog.Error("Unable to setup koordlet daemon: ", err)
......
......@@ -17,8 +17,12 @@ limitations under the License.
package features
import (
"fmt"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/component-base/featuregate"
slov1alpha1 "github.com/koordinator-sh/koordinator/apis/slo/v1alpha1"
)
const (
......@@ -75,3 +79,21 @@ var (
Accelerators: {Default: false, PreRelease: featuregate.Alpha},
}
)
// IsFeatureDisabled returns whether the featuregate is disabled by nodeSLO config
func IsFeatureDisabled(nodeSLO *slov1alpha1.NodeSLO, feature featuregate.Feature) (bool, error) {
if nodeSLO == nil || nodeSLO.Spec == (slov1alpha1.NodeSLOSpec{}) {
return true, fmt.Errorf("cannot parse feature config for invalid nodeSLO %v", nodeSLO)
}
spec := nodeSLO.Spec
switch feature {
case BECPUSuppress, BEMemoryEvict, BECPUEvict:
if spec.ResourceUsedThresholdWithBE == nil || spec.ResourceUsedThresholdWithBE.Enable == nil {
return true, fmt.Errorf("cannot parse feature config for invalid nodeSLO %v", nodeSLO)
}
return !(*spec.ResourceUsedThresholdWithBE.Enable), nil
default:
return true, fmt.Errorf("cannot parse feature config for unsupported feature %s", feature)
}
}
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package reason
const (
UpdateCPU = "UpdateCPU"
UpdateMemory = "UpdateMemory"
UpdateCgroups = "UpdateCgroups" // update cgroups excluding the options already stated above
UpdateSystemConfig = "UpdateSystemConfig"
UpdateResctrlSchemata = "UpdateResctrlSchemata" // update resctrl l3 cat schemata
UpdateResctrlTasks = "UpdateResctrlTasks" // update resctrl tasks
EvictPodByNodeMemoryUsage = "EvictPodByNodeMemoryUsage"
EvictPodByBECPUSatisfaction = "EvictPodByBECPUSatisfaction"
AdjustBEByNodeCPUUsage = "AdjustBEByNodeCPUUsage"
EvictPodSuccess = "evictPodSuccess"
EvictPodFail = "evictPodFail"
)
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testutil
import (
"fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
apiext "github.com/koordinator-sh/koordinator/apis/extension"
)
type FakeRecorder struct {
eventReason string
}
func (f *FakeRecorder) Event(object runtime.Object, eventType, reason, message string) {
f.eventReason = reason
fmt.Printf("send event:eventType:%s,reason:%s,message:%s", eventType, reason, message)
}
func (f *FakeRecorder) Eventf(object runtime.Object, eventType, reason, messageFmt string, args ...interface{}) {
f.eventReason = reason
fmt.Printf("send event:eventType:%s,reason:%s,message:%s", eventType, reason, messageFmt)
}
func (f *FakeRecorder) AnnotatedEventf(object runtime.Object, annotations map[string]string, eventType, reason, messageFmt string, args ...interface{}) {
f.Eventf(object, eventType, reason, messageFmt, args...)
}
func MockTestNode(cpu, memory string) *corev1.Node {
return &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "test-node",
Namespace: "default",
},
Status: corev1.NodeStatus{
Allocatable: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse(cpu),
corev1.ResourceMemory: resource.MustParse(memory),
},
Capacity: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse(cpu),
corev1.ResourceMemory: resource.MustParse(memory),
},
},
}
}
func MockTestPod(qosClass apiext.QoSClass, name string) *corev1.Pod {
return &corev1.Pod{
TypeMeta: metav1.TypeMeta{Kind: "Pod"},
ObjectMeta: metav1.ObjectMeta{
Name: name,
UID: types.UID(name),
Labels: map[string]string{
apiext.LabelPodQoS: string(qosClass),
},
},
}
}
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package types
const (
DefaultCFSPeriod = 100000
DefaultMemUnlimit = 9223372036854771712
)
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package utils
import (
corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
"github.com/koordinator-sh/koordinator/pkg/util"
"github.com/koordinator-sh/koordinator/pkg/util/runtime"
)
// KillContainers kills containers inside the pod
func KillContainers(pod *corev1.Pod, message string) {
for _, container := range pod.Spec.Containers {
containerID, containerStatus, err := util.FindContainerIdAndStatusByName(&pod.Status, container.Name)
if err != nil {
klog.Errorf("failed to find container id and status, error: %v", err)
return
}
if containerStatus == nil || containerStatus.State.Running == nil {
return
}
if containerID != "" {
runtimeType, _, _ := util.ParseContainerId(containerStatus.ContainerID)
runtimeHandler, err := runtime.GetRuntimeHandler(runtimeType)
if err != nil || runtimeHandler == nil {
klog.Errorf("%s, kill container(%s) error! GetRuntimeHandler fail! error: %v", message, containerStatus.ContainerID, err)
continue
}
if err := runtimeHandler.StopContainer(containerID, 0); err != nil {
klog.Errorf("%s, stop container error! error: %v", message, err)
}
} else {
klog.Warningf("%s, get container ID failed, pod %s/%s containerName %s status: %v", message, pod.Namespace, pod.Name, container.Name, pod.Status.ContainerStatuses)
}
}
}
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package utils
import (
corev1 "k8s.io/api/core/v1"
"github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer"
"github.com/koordinator-sh/koordinator/pkg/util"
)
func GetPodMetas(pods []*corev1.Pod) []*statesinformer.PodMeta {
podMetas := make([]*statesinformer.PodMeta, len(pods))
for index, pod := range pods {
cgroupDir := util.GetPodKubeRelativePath(pod)
podMeta := &statesinformer.PodMeta{CgroupDir: cgroupDir, Pod: pod.DeepCopy()}
podMetas[index] = podMeta
}
return podMetas
}
......@@ -17,9 +17,14 @@ limitations under the License.
package config
import (
"context"
"encoding/json"
"errors"
"flag"
"strings"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
cliflag "k8s.io/component-base/cli/flag"
"sigs.k8s.io/controller-runtime/pkg/client/config"
......@@ -28,6 +33,7 @@ import (
"github.com/koordinator-sh/koordinator/pkg/koordlet/audit"
"github.com/koordinator-sh/koordinator/pkg/koordlet/metriccache"
"github.com/koordinator-sh/koordinator/pkg/koordlet/metricsadvisor"
qosmanagerconfig "github.com/koordinator-sh/koordinator/pkg/koordlet/qosmanager/config"
"github.com/koordinator-sh/koordinator/pkg/koordlet/reporter"
"github.com/koordinator-sh/koordinator/pkg/koordlet/resmanager"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks"
......@@ -35,13 +41,23 @@ import (
"github.com/koordinator-sh/koordinator/pkg/util/system"
)
const (
DefaultKoordletConfigMapNamespace = "koordinator-system"
DefaultKoordletConfigMapName = "koordlet-config"
CMKeyQoSPluginExtraConfigs = "qos-plugin-extra-configs"
)
type Configuration struct {
ConfigMapName string
ConfigMapNamesapce string
KubeRestConf *rest.Config
StatesInformerConf *statesinformer.Config
ReporterConf *reporter.Config
CollectorConf *metricsadvisor.Config
MetricCacheConf *metriccache.Config
ResManagerConf *resmanager.Config
QosManagerConf *qosmanagerconfig.Config
RuntimeHookConf *runtimehooks.Config
AuditConf *audit.Config
FeatureGates map[string]bool
......@@ -49,23 +65,29 @@ type Configuration struct {
func NewConfiguration() *Configuration {
return &Configuration{
ConfigMapName: DefaultKoordletConfigMapName,
ConfigMapNamesapce: DefaultKoordletConfigMapNamespace,
StatesInformerConf: statesinformer.NewDefaultConfig(),
ReporterConf: reporter.NewDefaultConfig(),
CollectorConf: metricsadvisor.NewDefaultConfig(),
MetricCacheConf: metriccache.NewDefaultConfig(),
ResManagerConf: resmanager.NewDefaultConfig(),
QosManagerConf: qosmanagerconfig.NewDefaultConfig(),
RuntimeHookConf: runtimehooks.NewDefaultConfig(),
AuditConf: audit.NewDefaultConfig(),
}
}
func (c *Configuration) InitFlags(fs *flag.FlagSet) {
fs.StringVar(&c.ConfigMapName, "configmap-name", DefaultKoordletConfigMapName, "determines the name the koordlet configmap uses.")
fs.StringVar(&c.ConfigMapNamesapce, "configmap-namespace", DefaultKoordletConfigMapNamespace, "determines the namespace of configmap uses.")
system.Conf.InitFlags(fs)
c.StatesInformerConf.InitFlags(fs)
c.ReporterConf.InitFlags(fs)
c.CollectorConf.InitFlags(fs)
c.MetricCacheConf.InitFlags(fs)
c.ResManagerConf.InitFlags(fs)
c.QosManagerConf.InitFlags(fs)
c.RuntimeHookConf.InitFlags(fs)
c.AuditConf.InitFlags(fs)
fs.Var(cliflag.NewMapStringBool(&c.FeatureGates), "feature-gates", "A set of key=value pairs that describe feature gates for alpha/experimental features. "+
......@@ -81,3 +103,28 @@ func (c *Configuration) InitClient() error {
c.KubeRestConf = cfg
return nil
}
func (c *Configuration) InitFromConfigMap() error {
if c.KubeRestConf == nil {
return errors.New("KubeRestConf is nil")
}
cli, err := kubernetes.NewForConfig(c.KubeRestConf)
if err != nil {
return err
}
cm, err := cli.CoreV1().ConfigMaps(c.ConfigMapNamesapce).Get(context.TODO(), c.ConfigMapName, metav1.GetOptions{})
if err != nil {
return err
}
// Setup extra configs for QoS Manager.
if qosPluginExtraConfigRaw, found := cm.Data[CMKeyQoSPluginExtraConfigs]; found {
var extraConfigs map[string]string
if err = json.Unmarshal([]byte(qosPluginExtraConfigRaw), &extraConfigs); err != nil {
return err
}
c.QosManagerConf.PluginExtraConfigs = extraConfigs
}
return nil
}
......@@ -37,6 +37,7 @@ import (
"github.com/koordinator-sh/koordinator/pkg/koordlet/metrics"
"github.com/koordinator-sh/koordinator/pkg/koordlet/metricsadvisor"
"github.com/koordinator-sh/koordinator/pkg/koordlet/pleg"
"github.com/koordinator-sh/koordinator/pkg/koordlet/qosmanager"
"github.com/koordinator-sh/koordinator/pkg/koordlet/reporter"
"github.com/koordinator-sh/koordinator/pkg/koordlet/resmanager"
"github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks"
......@@ -62,6 +63,7 @@ type daemon struct {
metricCache metriccache.MetricCache
reporter reporter.Reporter
resManager resmanager.ResManager
qosManager qosmanager.QoSManager
runtimeHook runtimehooks.RuntimeHook
pleg pleg.Pleg
}
......@@ -124,6 +126,8 @@ func NewDaemon(config *config.Configuration) (Daemon, error) {
resManagerService := resmanager.NewResManager(config.ResManagerConf, scheme, kubeClient, crdClient, nodeName, statesInformer, metricCache, int64(config.CollectorConf.CollectResUsedIntervalSeconds))
qosManager := qosmanager.NewQosManager(config.QosManagerConf, scheme, kubeClient, nodeName, statesInformer, metricCache)
runtimeHook, err := runtimehooks.NewRuntimeHook(statesInformer, config.RuntimeHookConf)
if err != nil {
return nil, err
......@@ -135,6 +139,7 @@ func NewDaemon(config *config.Configuration) (Daemon, error) {
metricCache: metricCache,
reporter: reporterService,
resManager: resManagerService,
qosManager: qosManager,
runtimeHook: runtimeHook,
pleg: pleg,
}
......@@ -191,6 +196,14 @@ func (d *daemon) Run(stopCh <-chan struct{}) {
}
}()
// start QoS Manager
go func() {
if err := d.qosManager.Run(stopCh); err != nil {
klog.Error("Unable to run the QoSManager: ", err)
os.Exit(1)
}
}()
go func() {
if err := d.runtimeHook.Run(stopCh); err != nil {
klog.Errorf("Unable to run the runtimeHook: ", err)
......
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"flag"
"strings"
"k8s.io/apimachinery/pkg/util/runtime"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/featuregate"
"github.com/koordinator-sh/koordinator/pkg/koordlet/qosmanager/plugins"
)
var (
DefaultMutableQoSManagerFG featuregate.MutableFeatureGate = featuregate.NewFeatureGate()
DefaultQoSManagerFG featuregate.FeatureGate = DefaultMutableQoSManagerFG
defaultQoSManagerFG = map[featuregate.Feature]featuregate.FeatureSpec{}
QoSPluginFactories = map[featuregate.Feature]plugins.PluginFactoryFn{}
)
type Config struct {
FeatureGates map[string]bool
PluginExtraConfigs map[string]string
}
func NewDefaultConfig() *Config {
return &Config{
FeatureGates: map[string]bool{},
PluginExtraConfigs: map[string]string{},
}
}
func (c *Config) InitFlags(fs *flag.FlagSet) {
fs.Var(cliflag.NewMapStringBool(&c.FeatureGates), "qos-plugins",
"A set of key=value pairs that describe feature gates for QoS Manager plugins alpha/experimental features. "+
"Options are:\n"+strings.Join(DefaultQoSManagerFG.KnownFeatures(), "\n"))
}
func init() {
runtime.Must(DefaultMutableQoSManagerFG.Add(defaultQoSManagerFG))
}
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"flag"
"testing"
"github.com/stretchr/testify/assert"
)
func Test_NewDefaultConfig(t *testing.T) {
expectConfig := &Config{
FeatureGates: map[string]bool{},
PluginExtraConfigs: map[string]string{},
}
defaultConfig := NewDefaultConfig()
assert.Equal(t, expectConfig, defaultConfig)
}
func Test_InitFlags(t *testing.T) {
cfg := NewDefaultConfig()
cfg.InitFlags(flag.CommandLine)
flag.Parse()
}
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package k8s
import (
"context"
"fmt"
"sync"
corev1 "k8s.io/api/core/v1"
policyv1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/klog/v2"
"github.com/koordinator-sh/koordinator/pkg/koordlet/audit"
"github.com/koordinator-sh/koordinator/pkg/koordlet/common/reason"
"github.com/koordinator-sh/koordinator/pkg/koordlet/metrics"
expireCache "github.com/koordinator-sh/koordinator/pkg/tools/cache"
)
type K8sClient interface {
Run(<-chan struct{}) error
EvictPods(evictPods []*corev1.Pod, node *corev1.Node, reason string, message string)
}
var _ K8sClient = &k8sClient{}
func NewK8sClient(kubeClinet kubernetes.Interface, eventRecorder record.EventRecorder) K8sClient {
return &k8sClient{
kubeClient: kubeClinet,
eventRecorder: eventRecorder,
podsEvicted: expireCache.NewCacheDefault(),
}
}
type k8sClient struct {
kubeClient kubernetes.Interface
eventRecorder record.EventRecorder
podsEvicted *expireCache.Cache
started bool
lock sync.RWMutex
}
// Run impl interface K8sClient.
func (r *k8sClient) Run(stop <-chan struct{}) error {
r.lock.Lock()
defer r.lock.Unlock()
if r.started {
return nil
}
return r.podsEvicted.Run(stop)
}
// EvictPods impl interface K8sClient.
func (r *k8sClient) EvictPods(evictPods []*corev1.Pod, node *corev1.Node, reason string, message string) {
for _, evictPod := range evictPods {
r.evictPodIfNotEvicted(evictPod, node, reason, message)
}
}
func (r *k8sClient) evictPodIfNotEvicted(evictPod *corev1.Pod, node *corev1.Node, reason string, message string) {
_, evicted := r.podsEvicted.Get(string(evictPod.UID))
if evicted {
klog.V(5).Infof("Pod has been evicted! podID: %v, evict reason: %s", evictPod.UID, reason)
return
}
success := r.evictPod(evictPod, node, reason, message)
if success {
_ = r.podsEvicted.SetDefault(string(evictPod.UID), evictPod.UID)
}
}
func (r *k8sClient) evictPod(evictPod *corev1.Pod, node *corev1.Node, reasonMsg string, message string) bool {
podEvictMessage := fmt.Sprintf("evict Pod:%s, reason: %s, message: %v", evictPod.Name, reasonMsg, message)
_ = audit.V(0).Pod(evictPod.Namespace, evictPod.Name).Reason(reasonMsg).Message(message).Do()
podEvict := policyv1.Eviction{
ObjectMeta: metav1.ObjectMeta{
Name: evictPod.Name,
Namespace: evictPod.Namespace,
},
}
if err := r.kubeClient.CoreV1().Pods(evictPod.Namespace).EvictV1(context.TODO(), &podEvict); err == nil {
r.eventRecorder.Eventf(node, corev1.EventTypeWarning, reason.EvictPodSuccess, podEvictMessage)
metrics.RecordPodEviction(reasonMsg)
klog.Infof("evict pod %v/%v success, reason: %v", evictPod.Namespace, evictPod.Name, reasonMsg)
return true
} else if !errors.IsNotFound(err) {
r.eventRecorder.Eventf(node, corev1.EventTypeWarning, reason.EvictPodFail, podEvictMessage)
klog.Errorf("evict pod %v/%v failed, reason: %v, error: %v", evictPod.Namespace, evictPod.Name, reasonMsg, err)
return false
}
return true
}
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package qosmanager
import (
"fmt"
corev1 "k8s.io/api/core/v1"
apiruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
clientcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
"k8s.io/component-base/featuregate"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"
"github.com/koordinator-sh/koordinator/pkg/koordlet/metriccache"
"github.com/koordinator-sh/koordinator/pkg/koordlet/qosmanager/config"
"github.com/koordinator-sh/koordinator/pkg/koordlet/qosmanager/k8s"
"github.com/koordinator-sh/koordinator/pkg/koordlet/qosmanager/metricsquery"
"github.com/koordinator-sh/koordinator/pkg/koordlet/qosmanager/plugins"
"github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer"
)
type QoSManager interface {
Run(stopCh <-chan struct{}) error
}
func NewQosManager(cfg *config.Config, schema *apiruntime.Scheme, kubeClient kubernetes.Interface, nodeName string,
statesInformer statesinformer.StatesInformer, metricCache metriccache.MetricCache) QoSManager {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&clientcorev1.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(schema, corev1.EventSource{Component: "koordlet-qos-manager", Host: nodeName})
return &qosManager{
cfg: cfg,
nodeName: nodeName,
pluginCtx: &plugins.PluginContext{
K8sClient: k8s.NewK8sClient(kubeClient, recorder),
StatesInformer: statesInformer,
MetricCache: metricCache,
MetricsQuery: metricsquery.NewMetricsQuery(metricCache, statesInformer),
},
}
}
type qosManager struct {
cfg *config.Config
nodeName string
pluginCtx *plugins.PluginContext
plugins map[featuregate.Feature]plugins.Plugin
}
func (m *qosManager) Run(stopCh <-chan struct{}) error {
klog.Infof("Start running QoS Manager")
for fgStr, enable := range m.cfg.FeatureGates {
if !enable {
continue
}
fg := featuregate.Feature(fgStr)
if _, found := m.plugins[fg]; found {
return fmt.Errorf("duplicated plugin for %v", fg)
}
pluginCtx := &plugins.PluginContext{
K8sClient: m.pluginCtx.K8sClient,
StatesInformer: m.pluginCtx.StatesInformer,
MetricCache: m.pluginCtx.MetricCache,
MetricsQuery: m.pluginCtx.MetricsQuery,
}
if extraConfig, found := m.cfg.PluginExtraConfigs[string(fg)]; found && extraConfig != "" {
pluginCtx.ExtraConfig = pointer.StringPtr(extraConfig)
}
pluginFactory, found := config.QoSPluginFactories[fg]
if !found {
return fmt.Errorf("PluginFactory for %v not found", fg)
}
m.plugins[fg] = pluginFactory(pluginCtx)
}
for fg, pl := range m.plugins {
klog.Infof("\t Start running qos plugin: %v", fg)
if err := pl.Start(); err != nil {
klog.Fatalf("failed to start qos plugin %v, error: %v", fg, err)
}
}
return nil
}
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package qosmanager
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metricsquery
import (
"fmt"
"time"
"k8s.io/klog/v2"
"github.com/koordinator-sh/koordinator/pkg/koordlet/metriccache"
"github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer"
)
type MetricsQuery interface {
CollectNodeAndPodMetricLast(collectResUsedIntervalSeconds int64) (
*metriccache.NodeResourceMetric, []*metriccache.PodResourceMetric)
CollectNodeMetricsAvg(windowSeconds int64) metriccache.NodeResourceQueryResult
CollectNodeAndPodMetrics(queryParam *metriccache.QueryParam) (
*metriccache.NodeResourceMetric, []*metriccache.PodResourceMetric)
CollectContainerResMetricLast(containerID *string, collectResUsedIntervalSeconds int64) metriccache.ContainerResourceQueryResult
CollectContainerThrottledMetricLast(containerID *string, collectResUsedIntervalSeconds int64) metriccache.ContainerThrottledQueryResult
CollectPodMetric(podMeta *statesinformer.PodMeta, queryParam *metriccache.QueryParam) metriccache.PodResourceQueryResult
}
var _ MetricsQuery = &metricsQuery{}
// NewMetricsQuery creates an instance which implements interface MetricsQuery.
func NewMetricsQuery(metricCache metriccache.MetricCache, statesInformer statesinformer.StatesInformer) MetricsQuery {
return &metricsQuery{
metricCache: metricCache,
statesInformer: statesInformer,
}
}
type metricsQuery struct {
metricCache metriccache.MetricCache
statesInformer statesinformer.StatesInformer
}
// CollectNodeMetricsAvg impl plugins.MetricQuery interface.
func (r *metricsQuery) CollectNodeMetricsAvg(windowSeconds int64) metriccache.NodeResourceQueryResult {
queryParam := GenerateQueryParamsAvg(windowSeconds)
return r.collectNodeMetric(queryParam)
}
// CollectNodeAndPodMetricLast impl plugins.MetricQuery interface.
func (r *metricsQuery) CollectNodeAndPodMetricLast(collectResUsedIntervalSeconds int64) (
*metriccache.NodeResourceMetric, []*metriccache.PodResourceMetric) {
queryParam := GenerateQueryParamsLast(collectResUsedIntervalSeconds * 2)
return r.CollectNodeAndPodMetrics(queryParam)
}
// CollectNodeAndPodMetrics impl plugins.MetricQuery interface.
func (r *metricsQuery) CollectNodeAndPodMetrics(queryParam *metriccache.QueryParam) (
*metriccache.NodeResourceMetric, []*metriccache.PodResourceMetric) {
// collect node's and all pods' metrics with the same query param
nodeQueryResult := r.collectNodeMetric(queryParam)
nodeMetric := nodeQueryResult.Metric
podsMeta := r.statesInformer.GetAllPods()
podsMetrics := make([]*metriccache.PodResourceMetric, 0, len(podsMeta))
for _, podMeta := range podsMeta {
podQueryResult := r.CollectPodMetric(podMeta, queryParam)
podMetric := podQueryResult.Metric
if podMetric != nil {
podsMetrics = append(podsMetrics, podMetric)
}
}
return nodeMetric, podsMetrics
}
func (r *metricsQuery) collectNodeMetric(queryParam *metriccache.QueryParam) metriccache.NodeResourceQueryResult {
queryResult := r.metricCache.GetNodeResourceMetric(queryParam)
if queryResult.Error != nil {
klog.Warningf("get node resource metric failed, error %v", queryResult.Error)
return queryResult
}
if queryResult.Metric == nil {
klog.Warningf("node metric not exist")
return queryResult
}
return queryResult
}
func (r *metricsQuery) CollectPodMetric(podMeta *statesinformer.PodMeta,
queryParam *metriccache.QueryParam) metriccache.PodResourceQueryResult {
if podMeta == nil || podMeta.Pod == nil {
return metriccache.PodResourceQueryResult{QueryResult: metriccache.QueryResult{Error: fmt.Errorf("pod is nil")}}
}
podUID := string(podMeta.Pod.UID)
queryResult := r.metricCache.GetPodResourceMetric(&podUID, queryParam)
if queryResult.Error != nil {
klog.Warningf("get pod %v resource metric failed, error %v", podUID, queryResult.Error)
return queryResult
}
if queryResult.Metric == nil {
klog.Warningf("pod %v metric not exist", podUID)
return queryResult
}
return queryResult
}
// CollectContainerResMetricLast creates an instance which implements interface MetricsQuery.
func (r *metricsQuery) CollectContainerResMetricLast(containerID *string, collectResUsedIntervalSeconds int64) metriccache.ContainerResourceQueryResult {
if containerID == nil {
return metriccache.ContainerResourceQueryResult{
QueryResult: metriccache.QueryResult{Error: fmt.Errorf("container is nil")},
}
}
queryParam := GenerateQueryParamsLast(collectResUsedIntervalSeconds * 2)
queryResult := r.metricCache.GetContainerResourceMetric(containerID, queryParam)
if queryResult.Error != nil {
klog.Warningf("get container %v resource metric failed, error %v", containerID, queryResult.Error)
return queryResult
}
if queryResult.Metric == nil {
klog.Warningf("container %v metric not exist", containerID)
return queryResult
}
return queryResult
}
// CollectContainerThrottledMetricLast creates an instance which implements interface MetricsQuery.
func (r *metricsQuery) CollectContainerThrottledMetricLast(containerID *string, collectResUsedIntervalSeconds int64) metriccache.ContainerThrottledQueryResult {
if containerID == nil {
return metriccache.ContainerThrottledQueryResult{
QueryResult: metriccache.QueryResult{Error: fmt.Errorf("container is nil")},
}
}
queryParam := GenerateQueryParamsLast(collectResUsedIntervalSeconds * 2)
queryResult := r.metricCache.GetContainerThrottledMetric(containerID, queryParam)
if queryResult.Error != nil {
klog.Warningf("get container %v throttled metric failed, error %v", containerID, queryResult.Error)
return queryResult
}
if queryResult.Metric == nil {
klog.Warningf("container %v metric not exist", containerID)
return queryResult
}
return queryResult
}
func GenerateQueryParamsAvg(windowSeconds int64) *metriccache.QueryParam {
end := time.Now()
start := end.Add(-time.Duration(windowSeconds) * time.Second)
queryParam := &metriccache.QueryParam{
Aggregate: metriccache.AggregationTypeAVG,
Start: &start,
End: &end,
}
return queryParam
}
func GenerateQueryParamsLast(windowSeconds int64) *metriccache.QueryParam {
end := time.Now()
start := end.Add(-time.Duration(windowSeconds) * time.Second)
queryParam := &metriccache.QueryParam{
Aggregate: metriccache.AggregationTypeLast,
Start: &start,
End: &end,
}
return queryParam
}
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metricsquery
import (
"testing"
"time"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
"k8s.io/apimachinery/pkg/api/resource"
"github.com/koordinator-sh/koordinator/apis/extension"
"github.com/koordinator-sh/koordinator/pkg/koordlet/common/testutil"
"github.com/koordinator-sh/koordinator/pkg/koordlet/metriccache"
"github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer"
mock_statesinformer "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer/mockstatesinformer"
)
func Test_collectNodeMetricsAvg(t *testing.T) {
type nodeMetric struct {
timestamp time.Time
nodeResUsed *metriccache.NodeResourceMetric
}
type args struct {
name string
nodeMetrics []*nodeMetric
windowSize int64
expectNodeMetric *metriccache.NodeResourceMetric
}
tests := []args{
{
name: "test no metrics in db",
windowSize: 4,
expectNodeMetric: nil,
},
{
name: "test windowSize < dataInterval",
nodeMetrics: []*nodeMetric{
{
timestamp: time.Now().Add(-3 * time.Second),
nodeResUsed: &metriccache.NodeResourceMetric{
CPUUsed: metriccache.CPUMetric{CPUUsed: resource.MustParse("14")},
MemoryUsed: metriccache.MemoryMetric{MemoryWithoutCache: resource.MustParse("60G")},
},
},
{
timestamp: time.Now().Add(-1 * time.Second),
nodeResUsed: &metriccache.NodeResourceMetric{
CPUUsed: metriccache.CPUMetric{CPUUsed: resource.MustParse("16")},
MemoryUsed: metriccache.MemoryMetric{MemoryWithoutCache: resource.MustParse("70G")},
},
},
},
windowSize: 1,
expectNodeMetric: nil,
},
{
name: "test windowSize > dataInterval",
nodeMetrics: []*nodeMetric{
{
timestamp: time.Now().Add(-7 * time.Second),
nodeResUsed: &metriccache.NodeResourceMetric{
CPUUsed: metriccache.CPUMetric{CPUUsed: resource.MustParse("10")},
MemoryUsed: metriccache.MemoryMetric{MemoryWithoutCache: resource.MustParse("40G")},
},
},
{
timestamp: time.Now().Add(-5 * time.Second),
nodeResUsed: &metriccache.NodeResourceMetric{
CPUUsed: metriccache.CPUMetric{CPUUsed: resource.MustParse("12")},
MemoryUsed: metriccache.MemoryMetric{MemoryWithoutCache: resource.MustParse("50G")},
},
},
{
timestamp: time.Now().Add(-3 * time.Second),
nodeResUsed: &metriccache.NodeResourceMetric{
CPUUsed: metriccache.CPUMetric{CPUUsed: resource.MustParse("14")},
MemoryUsed: metriccache.MemoryMetric{MemoryWithoutCache: resource.MustParse("60G")},
},
},
{
timestamp: time.Now().Add(-1 * time.Second),
nodeResUsed: &metriccache.NodeResourceMetric{
CPUUsed: metriccache.CPUMetric{CPUUsed: resource.MustParse("16")},
MemoryUsed: metriccache.MemoryMetric{MemoryWithoutCache: resource.MustParse("70G")},
},
},
},
windowSize: 4,
expectNodeMetric: &metriccache.NodeResourceMetric{
CPUUsed: metriccache.CPUMetric{CPUUsed: *resource.NewMilliQuantity(15000, resource.DecimalSI)},
MemoryUsed: metriccache.MemoryMetric{MemoryWithoutCache: *resource.NewQuantity(65000000000, resource.BinarySI)},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
metricCache, _ := metriccache.NewCacheNotShareMetricCache(&metriccache.Config{MetricGCIntervalSeconds: 1, MetricExpireSeconds: 1})
for _, nodeMetric := range tt.nodeMetrics {
_ = metricCache.InsertNodeResourceMetric(nodeMetric.timestamp, nodeMetric.nodeResUsed)
}
metricsQuery := NewMetricsQuery(metricCache, nil)
queryResult := metricsQuery.CollectNodeMetricsAvg(tt.windowSize)
gotNodeMetric := queryResult.Metric
assert.Equal(t, tt.expectNodeMetric, gotNodeMetric)
})
}
}
func Test_collectNodeAndPodMetricLast(t *testing.T) {
type metricInfos struct {
timestamp time.Time
nodeResUsed *metriccache.NodeResourceMetric
podResUsed *metriccache.PodResourceMetric
}
type args struct {
name string
metricInfos []*metricInfos
pod *statesinformer.PodMeta
expectNodeMetric *metriccache.NodeResourceMetric
expectPodMetric *metriccache.PodResourceMetric
}
tests := []args{
{
name: "test no metrics in db",
pod: &statesinformer.PodMeta{Pod: testutil.MockTestPod(extension.QoSLSR, "test_pod")},
expectNodeMetric: nil,
expectPodMetric: nil,
},
{
name: "test normal",
pod: &statesinformer.PodMeta{Pod: testutil.MockTestPod(extension.QoSLSR, "test_pod")},
metricInfos: []*metricInfos{
{
timestamp: time.Now().Add(-3 * time.Second),
nodeResUsed: &metriccache.NodeResourceMetric{
CPUUsed: metriccache.CPUMetric{CPUUsed: resource.MustParse("14")},
MemoryUsed: metriccache.MemoryMetric{MemoryWithoutCache: resource.MustParse("60G")},
},
podResUsed: &metriccache.PodResourceMetric{
PodUID: "test_pod",
CPUUsed: metriccache.CPUMetric{CPUUsed: resource.MustParse("14")},
MemoryUsed: metriccache.MemoryMetric{MemoryWithoutCache: resource.MustParse("60G")},
},
},
{
timestamp: time.Now().Add(-1 * time.Second),
nodeResUsed: &metriccache.NodeResourceMetric{
CPUUsed: metriccache.CPUMetric{CPUUsed: resource.MustParse("16")},
MemoryUsed: metriccache.MemoryMetric{MemoryWithoutCache: resource.MustParse("70G")},
},
podResUsed: &metriccache.PodResourceMetric{
PodUID: "test_pod",
CPUUsed: metriccache.CPUMetric{CPUUsed: resource.MustParse("16")},
MemoryUsed: metriccache.MemoryMetric{MemoryWithoutCache: resource.MustParse("70G")},
},
},
},
expectNodeMetric: &metriccache.NodeResourceMetric{
CPUUsed: metriccache.CPUMetric{CPUUsed: *resource.NewMilliQuantity(16000, resource.DecimalSI)},
MemoryUsed: metriccache.MemoryMetric{MemoryWithoutCache: *resource.NewQuantity(70000000000, resource.BinarySI)},
},
expectPodMetric: &metriccache.PodResourceMetric{
PodUID: "test_pod",
CPUUsed: metriccache.CPUMetric{CPUUsed: *resource.NewMilliQuantity(16000, resource.DecimalSI)},
MemoryUsed: metriccache.MemoryMetric{MemoryWithoutCache: *resource.NewQuantity(70000000000, resource.BinarySI)},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
metricCache, _ := metriccache.NewCacheNotShareMetricCache(&metriccache.Config{MetricGCIntervalSeconds: 60, MetricExpireSeconds: 60})
for _, metricInfo := range tt.metricInfos {
_ = metricCache.InsertNodeResourceMetric(metricInfo.timestamp, metricInfo.nodeResUsed)
_ = metricCache.InsertPodResourceMetric(metricInfo.timestamp, metricInfo.podResUsed)
}
ctl := gomock.NewController(t)
defer ctl.Finish()
mockstatesinformer := mock_statesinformer.NewMockStatesInformer(ctl)
mockstatesinformer.EXPECT().GetAllPods().Return([]*statesinformer.PodMeta{tt.pod}).AnyTimes()
metricsQuery := NewMetricsQuery(metricCache, mockstatesinformer)
gotNodeMetric, gotPodMetrics := metricsQuery.CollectNodeAndPodMetricLast(60)
assert.Equal(t, tt.expectNodeMetric, gotNodeMetric)
if tt.expectPodMetric == nil {
assert.True(t, len(gotPodMetrics) == 0)
} else {
assert.Equal(t, tt.expectPodMetric, gotPodMetrics[0])
}
})
}
}
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package plugins
import (
"github.com/koordinator-sh/koordinator/pkg/koordlet/metriccache"
"github.com/koordinator-sh/koordinator/pkg/koordlet/qosmanager/k8s"
"github.com/koordinator-sh/koordinator/pkg/koordlet/qosmanager/metricsquery"
"github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer"
)
type PluginContext struct {
K8sClient k8s.K8sClient
StatesInformer statesinformer.StatesInformer
MetricCache metriccache.MetricCache
MetricsQuery metricsquery.MetricsQuery
// Extra custom configuration for plugin.
ExtraConfig *string
}
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package plugins
import (
"k8s.io/component-base/featuregate"
)
// Plugin interface contains methods which must be implemented by all plugins.
type Plugin interface {
// Name is the name of plugin, must be unique.
Name() string
// Start is called to run this plugin.
Start() error
// Stop is called to stop this plugin.
Stop() error
// Feature returns feature name of this plugin.
Feature() featuregate.Feature
}
type PluginFactoryFn func(ctx *PluginContext) Plugin
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment