Commit 2f38aeb1 authored by xieydd's avatar xieydd
Browse files

implement replicas recommendation

parent 61a7fa7b
Showing with 578 additions and 148 deletions
+578 -148
......@@ -236,4 +236,4 @@ ifeq (, $(shell which mockgen))
GO_MOCKGEN=$(shell go env GOPATH)/bin/mockgen
else
GO_MOCKGEN=$(shell which mockgen)
endif
\ No newline at end of file
endif
......@@ -3,10 +3,15 @@ package app
import (
"context"
"flag"
recommender "github.com/gocrane/crane/pkg/recommendation"
"os"
"strings"
recommendationctrl "github.com/gocrane/crane/pkg/controller/recommendation"
"github.com/gocrane/crane/pkg/recommendation"
"github.com/gocrane/crane/pkg/recommendation/config"
recommender "github.com/gocrane/crane/pkg/recommendation/recommender"
"github.com/gocrane/crane/pkg/recommendation/replicas"
"github.com/spf13/cobra"
"golang.org/x/sync/errgroup"
"k8s.io/apimachinery/pkg/runtime"
......@@ -30,7 +35,6 @@ import (
"github.com/gocrane/crane/pkg/controller/cnp"
"github.com/gocrane/crane/pkg/controller/ehpa"
"github.com/gocrane/crane/pkg/controller/evpa"
"github.com/gocrane/crane/pkg/controller/recommendation"
"github.com/gocrane/crane/pkg/controller/timeseriesprediction"
"github.com/gocrane/crane/pkg/features"
"github.com/gocrane/crane/pkg/known"
......@@ -112,7 +116,11 @@ func Run(ctx context.Context, opts *options.Options) error {
predictorMgr := initPredictorManager(opts, realtimeDataSources, histroyDataSources)
recommenders, err := initRecommenders(opts)
recommenderMgr := initRecommenderManager(recommenders)
if err != nil {
klog.Error(err, "failed to init recommenders")
return err
}
recommenderMgr := initRecommenderManager(recommenders, realtimeDataSources, histroyDataSources)
initScheme()
initWebhooks(mgr, opts)
......@@ -124,12 +132,26 @@ func Run(ctx context.Context, opts *options.Options) error {
return nil
}
func initRecommenders(opts *options.Options) ([]recommender.Recommender, error) {
return recommender.GetRecommendersFromConfiguration(opts.RecommendationConfiguration)
func initRecommenders(opts *options.Options) (map[string]recommender.Recommender, error) {
apiRecommenders, err := config.GetRecommendersFromConfiguration(opts.RecommendationConfiguration)
if err != nil {
return nil, err
}
recommenders := make(map[string]recommender.Recommender, len(apiRecommenders))
for _, r := range apiRecommenders {
switch r.Name {
case recommender.ReplicasRecommender:
recommenders[recommender.ReplicasRecommender] = replicas.NewReplicasRecommender(r)
default:
recommenders[recommender.ReplicasRecommender] = replicas.NewReplicasRecommender(r)
}
}
return recommenders, nil
}
func initRecommenderManager(recommenders []recommender.Recommender) recommender.RecommenderManager {
return recommender.NewRecommenderManager(recommenders)
func initRecommenderManager(recommenders map[string]recommender.Recommender, realtimeDataSources map[providers.DataSourceType]providers.RealTime, historyDataSources map[providers.DataSourceType]providers.History) recommendation.RecommenderManager {
return recommendation.NewRecommenderManager(recommenders, realtimeDataSources, historyDataSources)
}
func initScheme() {
......@@ -214,7 +236,7 @@ func initPredictorManager(opts *options.Options, realtimeDataSources map[provide
}
// initControllers setup controllers with manager
func initControllers(ctx context.Context, mgr ctrl.Manager, opts *options.Options, predictorMgr predictor.Manager, recommenderMgr recommender.RecommenderManager, historyDataSource providers.History) {
func initControllers(ctx context.Context, mgr ctrl.Manager, opts *options.Options, predictorMgr predictor.Manager, recommenderMgr recommendation.RecommenderManager, historyDataSource providers.History) {
discoveryClientSet, err := discovery.NewDiscoveryClientForConfig(mgr.GetConfig())
if err != nil {
klog.Exit(err, "Unable to create discover client")
......@@ -298,6 +320,7 @@ func initControllers(ctx context.Context, mgr ctrl.Manager, opts *options.Option
}
}
// TODO(qmhu), change feature gate from analysis to recommendation
if utilfeature.DefaultMutableFeatureGate.Enabled(features.CraneAnalysis) {
if err := (&analytics.Controller{
Client: mgr.GetClient(),
......@@ -312,7 +335,7 @@ func initControllers(ctx context.Context, mgr ctrl.Manager, opts *options.Option
klog.Exit(err, "unable to create controller", "controller", "AnalyticsController")
}
if err := (&recommendation.RecommendationController{
if err := (&recommendationctrl.RecommendationController{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
RestMapper: mgr.GetRESTMapper(),
......@@ -320,6 +343,18 @@ func initControllers(ctx context.Context, mgr ctrl.Manager, opts *options.Option
}).SetupWithManager(mgr); err != nil {
klog.Exit(err, "unable to create controller", "controller", "RecommendationController")
}
if err := (&recommendationctrl.RecommendationRuleController{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
RestMapper: mgr.GetRESTMapper(),
RecommenderMgr: recommenderMgr,
ScaleClient: scaleClient,
Provider: historyDataSource,
Recorder: mgr.GetEventRecorderFor("recommendationrule-controller"),
}).SetupWithManager(mgr); err != nil {
klog.Exit(err, "unable to create controller", "controller", "RecommendationRuleController")
}
}
// CnpController
......
......@@ -49,7 +49,7 @@ spec:
- --prometheus-address=PROMETHEUS_ADDRESS
- --feature-gates=Analysis=true,TimeSeriesPrediction=true,Autoscaling=true
- --recommendation-config-file=/tmp/recommendation-config/config_set.yaml
- --recommendation-configiguration-file=/tmp/recommendation-framework/recommendation_configuration.yaml
- --recommendation-configuration-file=/tmp/recommendation-framework/recommendation_configuration.yaml
- -v=4
volumeMounts:
- mountPath: /tmp/k8s-webhook-server/serving-certs
......
......@@ -4,7 +4,7 @@ go 1.17
require (
github.com/go-echarts/go-echarts/v2 v2.2.4
github.com/gocrane/api v0.6.1-0.20220721081535-2cf15fc58bf3
github.com/gocrane/api v0.6.1-0.20220809112454-68f0199a774e
github.com/google/cadvisor v0.39.2
github.com/mjibson/go-dsp v0.0.0-20180508042940-11479a337f12
github.com/prometheus/client_golang v1.11.0
......@@ -171,6 +171,7 @@ require (
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/fsnotify/fsnotify v1.5.1
github.com/json-iterator/go v1.1.12 // indirect
github.com/lithammer/fuzzysearch v1.1.5
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/robfig/cron/v3 v3.0.1
github.com/spf13/afero v1.6.0 // indirect
......@@ -181,7 +182,6 @@ require (
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/tools v0.1.8 // indirect
google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect
k8s.io/klog v0.3.0
)
replace (
......
......@@ -310,8 +310,8 @@ github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og=
github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.1.0-rc.5 h1:QOAag7FoBaBYYHRqzqkhhd8fq5RTubvI4v3Ft/gDVVQ=
github.com/gobwas/ws v1.1.0-rc.5/go.mod h1:nzvNcVha5eUziGrbxFCo6qFIojQHjJV5cLYIbezhfL0=
github.com/gocrane/api v0.6.1-0.20220721081535-2cf15fc58bf3 h1:cKnH9KoQzzBg3/bzYUfRZAhFW7rndswc+TB0rj+uzng=
github.com/gocrane/api v0.6.1-0.20220721081535-2cf15fc58bf3/go.mod h1:GxI+t9AW8+NsHkz2JkPBIJN//9eLUjTZl1ScYAbXMbk=
github.com/gocrane/api v0.6.1-0.20220809112454-68f0199a774e h1:pIocbZM7LchSMG7XBbfD9K+Im7zZtMZjVU7paVJOv6I=
github.com/gocrane/api v0.6.1-0.20220809112454-68f0199a774e/go.mod h1:GxI+t9AW8+NsHkz2JkPBIJN//9eLUjTZl1ScYAbXMbk=
github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/godbus/dbus/v5 v5.0.4 h1:9349emZab16e7zQvpmsbtjc18ykshndd8y2PG3sgJbA=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
......@@ -509,6 +509,8 @@ github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgx
github.com/libopenstorage/openstorage v1.0.0/go.mod h1:Sp1sIObHjat1BeXhfMqLZ14wnOzEhNx2YQedreMcUyc=
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE=
github.com/lithammer/dedent v1.1.0/go.mod h1:jrXYCQtgg0nJiN+StA2KgR7w6CiQNv9Fd/Z9BP0jIOc=
github.com/lithammer/fuzzysearch v1.1.5 h1:Ag7aKU08wp0R9QCfF4GoGST9HbmAIeLP7xwMrOBEp1c=
github.com/lithammer/fuzzysearch v1.1.5/go.mod h1:1R1LRNk7yKid1BaQkmuLQaHruxcC4HmAH30Dh61Ih1Q=
github.com/lpabon/godbc v0.1.1/go.mod h1:Jo9QV0cf3U6jZABgiJ2skINAXb9j8m51r07g4KI92ZA=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
......
......@@ -3,6 +3,7 @@ package recommendation
import (
"context"
"fmt"
predictormgr "github.com/gocrane/crane/pkg/predictor"
"k8s.io/klog/v2"
......
......@@ -8,6 +8,12 @@ import (
"sync"
"time"
"github.com/gocrane/crane/pkg/controller/analytics"
"github.com/gocrane/crane/pkg/providers"
recommender "github.com/gocrane/crane/pkg/recommendation"
"github.com/gocrane/crane/pkg/recommendation/framework"
analysisv1alph1 "github.com/gocrane/api/analysis/v1alpha1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
......@@ -28,33 +34,26 @@ import (
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/yaml"
analysisv1alph1 "github.com/gocrane/api/analysis/v1alpha1"
"github.com/gocrane/crane/pkg/known"
predictormgr "github.com/gocrane/crane/pkg/predictor"
"github.com/gocrane/crane/pkg/providers"
"github.com/gocrane/crane/pkg/recommend"
"github.com/gocrane/crane/pkg/utils"
)
type Controller struct {
type RecommendationRuleController struct {
client.Client
ConfigSet *analysisv1alph1.ConfigSet
Scheme *runtime.Scheme
Recorder record.EventRecorder
RestMapper meta.RESTMapper
ScaleClient scale.ScalesGetter
PredictorMgr predictormgr.Manager
Provider providers.History
RecommenderMgr recommender.RecommenderManager
kubeClient kubernetes.Interface
dynamicClient dynamic.Interface
discoveryClient discovery.DiscoveryInterface
K8SVersion *version.Version
Provider providers.History
}
func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
func (c *RecommendationRuleController) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
klog.V(4).InfoS("Got a RecommendationRule resource.", "RecommendationRule", req.NamespacedName)
recommendationRule := &analysisv1alph1.RecommendationRule{}
......@@ -108,7 +107,7 @@ func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
return ctrl.Result{RequeueAfter: time.Second * 1}, nil
}
func (c *Controller) doReconcile(ctx context.Context, recommendationRule *analysisv1alph1.RecommendationRule, interval time.Duration) bool {
func (c *RecommendationRuleController) doReconcile(ctx context.Context, recommendationRule *analysisv1alph1.RecommendationRule, interval time.Duration) bool {
newStatus := recommendationRule.Status.DeepCopy()
identities, err := c.getIdentities(ctx, recommendationRule)
......@@ -137,11 +136,17 @@ func (c *Controller) doReconcile(ctx context.Context, recommendationRule *analys
}
if currMissions == nil {
// create recommendation missions for this round
// create recommendation rule missions for this round
// every recommendation rule have multi recommender for one identity
for _, id := range identities {
currMissions = append(currMissions, analysisv1alph1.RecommendationMission{
TargetRef: id.GetObjectReference(),
})
for _, recommender := range recommendationRule.Spec.Recommenders {
currMissions = append(currMissions, analysisv1alph1.RecommendationMission{
TargetRef: id.GetObjectReference(),
RecommenderRef: analysisv1alph1.Recommender{
Name: recommender.Name,
},
})
}
}
}
......@@ -232,7 +237,7 @@ func (c *Controller) doReconcile(ctx context.Context, recommendationRule *analys
return finished
}
func (c *Controller) CreateRecommendationObject(recommendationRule *analysisv1alph1.RecommendationRule,
func (c *RecommendationRuleController) CreateRecommendationObject(recommendationRule *analysisv1alph1.RecommendationRule,
target corev1.ObjectReference, id ObjectIdentity, recommenderName string) *analysisv1alph1.Recommendation {
recommendation := &analysisv1alph1.Recommendation{
......@@ -261,7 +266,7 @@ func (c *Controller) CreateRecommendationObject(recommendationRule *analysisv1al
return recommendation
}
func (c *Controller) SetupWithManager(mgr ctrl.Manager) error {
func (c *RecommendationRuleController) SetupWithManager(mgr ctrl.Manager) error {
c.kubeClient = kubernetes.NewForConfigOrDie(mgr.GetConfig())
c.discoveryClient = discovery.NewDiscoveryClientForConfigOrDie(mgr.GetConfig())
c.dynamicClient = dynamic.NewForConfigOrDie(mgr.GetConfig())
......@@ -277,7 +282,7 @@ func (c *Controller) SetupWithManager(mgr ctrl.Manager) error {
Complete(c)
}
func (c *Controller) getIdentities(ctx context.Context, recommendationRule *analysisv1alph1.RecommendationRule) (map[string]ObjectIdentity, error) {
func (c *RecommendationRuleController) getIdentities(ctx context.Context, recommendationRule *analysisv1alph1.RecommendationRule) (map[string]ObjectIdentity, error) {
identities := map[string]ObjectIdentity{}
for _, rs := range recommendationRule.Spec.ResourceSelectors {
......@@ -357,7 +362,7 @@ func (c *Controller) getIdentities(ctx context.Context, recommendationRule *anal
return identities, nil
}
func (c *Controller) executeMission(ctx context.Context, wg *sync.WaitGroup, recommendationRule *analysisv1alph1.RecommendationRule, identities map[string]ObjectIdentity, mission *analysisv1alph1.RecommendationMission, existingRecommendation *analysisv1alph1.Recommendation, timeNow metav1.Time) {
func (c *RecommendationRuleController) executeMission(ctx context.Context, wg *sync.WaitGroup, recommendationRule *analysisv1alph1.RecommendationRule, identities map[string]ObjectIdentity, mission *analysisv1alph1.RecommendationMission, existingRecommendation *analysisv1alph1.Recommendation, timeNow metav1.Time) {
defer func() {
mission.LastStartTime = &timeNow
klog.Infof("Mission message: %s", mission.Message)
......@@ -372,32 +377,27 @@ func (c *Controller) executeMission(ctx context.Context, wg *sync.WaitGroup, rec
} else {
recommendation := existingRecommendation
if recommendation == nil {
recommendation = c.CreateRecommendationObject(recommendationRule, mission.TargetRef, id, "")
recommendation = c.CreateRecommendationObject(recommendationRule, mission.TargetRef, id, mission.RecommenderRef.Name)
}
// do recommendation
//recommender, err := recommend.NewRecommender(c.Client, c.RestMapper, c.ScaleClient, recommendation, c.PredictorMgr, c.Provider, c.configSet, analytics.Spec.Config)
//if err != nil {
// mission.Message = fmt.Sprintf("Failed to create recommender, Recommendation %s error %v", klog.KObj(recommendation), err)
// return
//}
var recommender recommend.Recommender
proposed, err := recommender.Offer()
if err != nil {
mission.Message = fmt.Sprintf("Failed to offer recommendation: %s", err.Error())
return
r := c.RecommenderMgr.GetRecommender(mission.RecommenderRef.Name)
p := make(map[providers.DataSourceType]providers.History)
p[providers.PrometheusDataSource] = c.Provider
identity := analytics.ObjectIdentity{
Namespace: identities[k].Namespace,
Name: identities[k].Name,
Kind: identities[k].Kind,
APIVersion: identities[k].APIVersion,
Labels: identities[k].Labels,
}
recommendationContext := framework.NewRecommendationContext(ctx, identity, p, recommendation, c.Client)
err := recommender.Run(&recommendationContext, r)
var value string
valueBytes, err := yaml.Marshal(proposed)
if err != nil {
mission.Message = err.Error()
mission.Message = fmt.Sprintf("Failed to run recommendation flow in recommender %s: %s", r.Name(), err.Error())
return
}
value = string(valueBytes)
recommendation.Status.RecommendedValue = value
recommendation.Status.LastUpdateTime = &timeNow
if existingRecommendation != nil {
klog.Infof("Update recommendation %s", klog.KObj(recommendation))
......@@ -426,7 +426,7 @@ func (c *Controller) executeMission(ctx context.Context, wg *sync.WaitGroup, rec
}
}
func (c *Controller) UpdateStatus(ctx context.Context, recommendationRule *analysisv1alph1.RecommendationRule, newStatus *analysisv1alph1.RecommendationRuleStatus) {
func (c *RecommendationRuleController) UpdateStatus(ctx context.Context, recommendationRule *analysisv1alph1.RecommendationRule, newStatus *analysisv1alph1.RecommendationRuleStatus) {
if !equality.Semantic.DeepEqual(&recommendationRule.Status, newStatus) {
recommendationRule.Status = *newStatus
err := c.Update(ctx, recommendationRule)
......
......@@ -3,6 +3,8 @@ package metricnaming
import (
"github.com/gocrane/crane/pkg/metricquery"
"github.com/gocrane/crane/pkg/querybuilder"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
)
// MetricNamer is an interface. it is the bridge between predictor and different data sources and other component such as caller.
......@@ -55,3 +57,40 @@ func NewQueryBuilder(metric *metricquery.Metric) querybuilder.QueryBuilder {
metric: metric,
}
}
func ResourceToWorkloadMetricNamer(target *corev1.ObjectReference, resourceName *corev1.ResourceName, workloadLabelSelector labels.Selector, caller string) MetricNamer {
// workload
return &GeneralMetricNamer{
CallerName: caller,
Metric: &metricquery.Metric{
Type: metricquery.WorkloadMetricType,
MetricName: resourceName.String(),
Workload: &metricquery.WorkloadNamerInfo{
Namespace: target.Namespace,
Kind: target.Kind,
APIVersion: target.APIVersion,
Name: target.Name,
Selector: workloadLabelSelector,
},
},
}
}
func ResourceToContainerMetricNamer(namespace, apiVersion, workloadKind, workloadName, containerName string, resourceName corev1.ResourceName, caller string) MetricNamer {
// container
return &GeneralMetricNamer{
CallerName: caller,
Metric: &metricquery.Metric{
Type: metricquery.ContainerMetricType,
MetricName: resourceName.String(),
Container: &metricquery.ContainerNamerInfo{
Namespace: namespace,
APIVersion: apiVersion,
WorkloadKind: workloadKind,
WorkloadName: workloadName,
Name: containerName,
Selector: labels.Everything(),
},
},
}
}
......@@ -58,4 +58,11 @@ const (
PrometheusDataSource DataSourceType = "prom"
MetricServerDataSource DataSourceType = "metricserver"
GrpcDataSource DataSourceType = "grpc"
DataSourceTypeKey string = "data-source-type"
)
var PrometheusConfigKeys = []string{"prometheus-address", "prometheus-auth-username", "prometheus-auth-password",
"prometheus-auth-bearertoken", "prometheus-query-concurrency", "prometheus-insecure-skip-verify",
"prometheus-keepalive", "prometheus-timeout", "prometheus-bratelimit", "prometheus-maxpoints"}
var GrpcConfigKeys = []string{"grpc-ds-address", "grpc-ds-timeout"}
......@@ -6,6 +6,8 @@ import (
"strconv"
"time"
"github.com/gocrane/crane/pkg/metricnaming"
"github.com/montanaflynn/stats"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/autoscaling/v1"
......@@ -18,8 +20,6 @@ import (
predictionapi "github.com/gocrane/api/prediction/v1alpha1"
"github.com/gocrane/crane/pkg/common"
"github.com/gocrane/crane/pkg/metricnaming"
"github.com/gocrane/crane/pkg/metricquery"
"github.com/gocrane/crane/pkg/metrics"
"github.com/gocrane/crane/pkg/prediction/config"
"github.com/gocrane/crane/pkg/recommend/types"
......@@ -54,7 +54,7 @@ func (a *ReplicasAdvisor) Advise(proposed *types.ProposedRecommendation) error {
return err
}
caller := fmt.Sprintf(callerFormat, klog.KObj(a.Recommendation), a.Recommendation.UID)
metricNamer := ResourceToWorkloadMetricNamer(target, &resourceCpu, labelSelector, caller)
metricNamer := metricnaming.ResourceToWorkloadMetricNamer(target, &resourceCpu, labelSelector, caller)
if err := metricNamer.Validate(); err != nil {
return err
}
......@@ -304,7 +304,7 @@ func (a *ReplicasAdvisor) proposeTargetUtilization() (int32, int64, error) {
// use percentile algo to get the 99 percentile cpu usage for this target
for _, container := range a.PodTemplate.Spec.Containers {
caller := fmt.Sprintf(callerFormat, klog.KObj(a.Recommendation), a.Recommendation.UID)
metricNamer := ResourceToContainerMetricNamer(a.Recommendation.Spec.TargetRef.Namespace, a.Recommendation.Spec.TargetRef.APIVersion,
metricNamer := metricnaming.ResourceToContainerMetricNamer(a.Recommendation.Spec.TargetRef.Namespace, a.Recommendation.Spec.TargetRef.APIVersion,
a.Recommendation.Spec.TargetRef.Kind, a.Recommendation.Spec.TargetRef.Name, container.Name, corev1.ResourceCPU, caller)
cpuConfig := makeCpuConfig(a.ConfigProperties)
tsList, err := utils.QueryPredictedValuesOnce(a.Recommendation,
......@@ -430,21 +430,3 @@ func GetTargetLabelSelector(target *corev1.ObjectReference, scale *v1.Scale, ds
return nil, fmt.Errorf("no daemonset specified")
}
}
func ResourceToWorkloadMetricNamer(target *corev1.ObjectReference, resourceName *corev1.ResourceName, workloadLabelSelector labels.Selector, caller string) metricnaming.MetricNamer {
// workload
return &metricnaming.GeneralMetricNamer{
CallerName: caller,
Metric: &metricquery.Metric{
Type: metricquery.WorkloadMetricType,
MetricName: resourceName.String(),
Workload: &metricquery.WorkloadNamerInfo{
Namespace: target.Namespace,
Kind: target.Kind,
APIVersion: target.APIVersion,
Name: target.Name,
Selector: workloadLabelSelector,
},
},
}
}
......@@ -3,15 +3,14 @@ package advisor
import (
"fmt"
"github.com/gocrane/crane/pkg/metricnaming"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/klog/v2"
predictionapi "github.com/gocrane/api/prediction/v1alpha1"
"github.com/gocrane/crane/pkg/metricnaming"
"github.com/gocrane/crane/pkg/metricquery"
"github.com/gocrane/crane/pkg/metrics"
"github.com/gocrane/crane/pkg/prediction/config"
"github.com/gocrane/crane/pkg/recommend/types"
......@@ -128,7 +127,7 @@ func (a *ResourceRequestAdvisor) Advise(proposed *types.ProposedRecommendation)
}
caller := fmt.Sprintf(callerFormat, klog.KObj(a.Recommendation), a.Recommendation.UID)
metricNamer := ResourceToContainerMetricNamer(namespace, a.Recommendation.Spec.TargetRef.APIVersion,
metricNamer := metricnaming.ResourceToContainerMetricNamer(namespace, a.Recommendation.Spec.TargetRef.APIVersion,
a.Recommendation.Spec.TargetRef.Kind, a.Recommendation.Spec.TargetRef.Name, c.Name, corev1.ResourceCPU, caller)
klog.V(6).Infof("CPU query for resource request recommendation: %s", metricNamer.BuildUniqueKey())
cpuConfig := makeCpuConfig(a.ConfigProperties)
......@@ -145,7 +144,7 @@ func (a *ResourceRequestAdvisor) Advise(proposed *types.ProposedRecommendation)
// export recommended values as prom metrics
a.recordResourceRecommendation(c.Name, corev1.ResourceCPU, q)
metricNamer = ResourceToContainerMetricNamer(namespace, a.Recommendation.Spec.TargetRef.APIVersion,
metricNamer = metricnaming.ResourceToContainerMetricNamer(namespace, a.Recommendation.Spec.TargetRef.APIVersion,
a.Recommendation.Spec.TargetRef.Kind, a.Recommendation.Spec.TargetRef.Name, c.Name, corev1.ResourceMemory, caller)
klog.V(6).Infof("Memory query for resource request recommendation: %s", metricNamer.BuildUniqueKey())
memConfig := makeMemConfig(a.ConfigProperties)
......@@ -200,22 +199,3 @@ func (a *ResourceRequestAdvisor) recordResourceRecommendation(containerName stri
func (a *ResourceRequestAdvisor) Name() string {
return "ResourceRequestAdvisor"
}
func ResourceToContainerMetricNamer(namespace, apiVersion, workloadKind, workloadName, containerName string, resourceName corev1.ResourceName, caller string) metricnaming.MetricNamer {
// container
return &metricnaming.GeneralMetricNamer{
CallerName: caller,
Metric: &metricquery.Metric{
Type: metricquery.ContainerMetricType,
MetricName: resourceName.String(),
Container: &metricquery.ContainerNamerInfo{
Namespace: namespace,
APIVersion: apiVersion,
WorkloadKind: workloadKind,
WorkloadName: workloadName,
Name: containerName,
Selector: labels.Everything(),
},
},
}
}
package recommendation
package config
import (
"encoding/json"
"fmt"
"github.com/gocrane/crane/pkg/recommendation/recommender/apis"
"github.com/gocrane/crane/pkg/recommendation/replicas"
"io/ioutil"
"k8s.io/klog"
"github.com/gocrane/crane/pkg/recommendation/recommender/apis"
klog "k8s.io/klog/v2"
)
func LoadRecommenderConfigFromFile(filePath string) (*apis.RecommenderConfiguration, error) {
......@@ -37,7 +37,7 @@ func loadConfigFromBytes(buf []byte) (*apis.RecommenderConfiguration, error) {
return config, nil
}
func GetRecommendersFromConfiguration(file string) ([]Recommender, error) {
func GetRecommendersFromConfiguration(file string) ([]apis.Recommender, error) {
config, err := LoadRecommenderConfigFromFile(file)
if err != nil {
klog.Errorf("load recommender configuration failed, %v", err)
......@@ -45,14 +45,36 @@ func GetRecommendersFromConfiguration(file string) ([]Recommender, error) {
}
configRecommenders := config.Recommenders
recommenders := make([]Recommender, len(configRecommenders))
for _, recommender := range configRecommenders {
switch recommender.Name {
case ReplicasRecommender:
recommenders = append(recommenders, replicas.NewReplicasRecommender(recommender))
default:
recommenders = append(recommenders, replicas.NewReplicasRecommender(recommender))
}
recommenders := make([]apis.Recommender, len(configRecommenders))
for _, r := range configRecommenders {
recommenders = append(recommenders, r)
}
return recommenders, nil
}
func GetKeysOfMap(m map[string]string) (keys []string) {
for k := range m {
keys = append(keys, k)
}
return
}
func SlicesContainSlice(src []string, target []string) bool {
contain := true
for _, value := range target {
if !contains(src, value) {
contain = false
}
}
return contain
}
func contains(s []string, str string) bool {
for _, v := range s {
if v == str {
return true
}
}
return false
}
......@@ -2,10 +2,16 @@ package framework
import (
"context"
"github.com/gocrane/api/analysis/v1alpha1"
"github.com/gocrane/crane/pkg/common"
"github.com/gocrane/crane/pkg/controller/analytics"
"github.com/gocrane/crane/pkg/metricnaming"
"github.com/gocrane/crane/pkg/prediction/config"
predictormgr "github.com/gocrane/crane/pkg/predictor"
"github.com/gocrane/crane/pkg/providers"
v1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)
type RecommendationContext struct {
......@@ -13,28 +19,46 @@ type RecommendationContext struct {
// The kubernetes resource object reference of recommendation flow.
Identity analytics.ObjectIdentity
// Time series data from data source.
Values *common.TimeSeries
InputValues []*common.TimeSeries
// Result series from prediction
ResultValues []*common.TimeSeries
// DataProviders contains data source of your recommendation flow.
DataProviders map[providers.DataSourceType]providers.Interface
DataProviders map[providers.DataSourceType]providers.History
// Recommendation store result of recommendation flow.
Recommendation v1alpha1.Recommendation
Recommendation *v1alpha1.Recommendation
// When cancel channel accept signal indicates that the context has been canceled. The recommendation should stop executing as soon as possible.
CancelCh <-chan struct{}
// CancelCh <-chan struct{}
// RecommendationRule for the context
RecommendationRule v1alpha1.RecommendationRule
// metrics namer for datasource provider
MetricNamer metricnaming.MetricNamer
// Algorithm Config
AlgorithmConfig *config.Config
// Manager of predict algorithm
PredictorMgr predictormgr.Manager
// Pod template
PodTemplate *v1.PodTemplateSpec
// Client
Client client.Client
}
func NewRecommendationContext(context context.Context, identity analytics.ObjectIdentity, dataProviders map[providers.DataSourceType]providers.Interface) RecommendationContext {
func NewRecommendationContext(context context.Context, identity analytics.ObjectIdentity, dataProviders map[providers.DataSourceType]providers.History, recommendation *v1alpha1.Recommendation, client client.Client) RecommendationContext {
return RecommendationContext{
Identity: identity,
Context: context,
DataProviders: dataProviders,
Identity: identity,
Context: context,
DataProviders: dataProviders,
Recommendation: recommendation,
Client: client,
//CancelCh: context.Done(),
}
}
func (ctx *RecommendationContext) Canceled() bool {
select {
case <-ctx.CancelCh:
return true
default:
return false
}
}
//func (ctx *RecommendationContext) Canceled() bool {
// select {
// case <-ctx.CancelCh:
// return true
// default:
// return false
// }
//}
package framework
// PreRecommend interface
type PreRecommend interface {
PreRecommend(ctx *RecommendationContext) error
}
// Recommend interface
type Recommend interface {
Recommend(ctx *RecommendationContext) error
......
package recommendation
import (
"sync"
"github.com/gocrane/crane/pkg/providers"
"github.com/gocrane/crane/pkg/recommendation/framework"
"github.com/gocrane/crane/pkg/recommendation/recommender"
"k8s.io/klog/v2"
)
type RecommenderManager interface {
// GetRecommender return a registered recommender
GetRecommender(recommenderName string) recommender.Recommender
}
func NewRecommenderManager(recommenders []Recommender) RecommenderManager {
func NewRecommenderManager(recommenders map[string]recommender.Recommender, realtimeDataSources map[providers.DataSourceType]providers.RealTime, historyDataSources map[providers.DataSourceType]providers.History) RecommenderManager {
return &manager{
recommenders: recommenders,
recommenders: recommenders,
realtimeDataSources: realtimeDataSources,
historyDataSources: historyDataSources,
}
}
type manager struct {
recommenders []Recommender
lock sync.Mutex
recommenders map[string]recommender.Recommender
realtimeDataSources map[providers.DataSourceType]providers.RealTime
historyDataSources map[providers.DataSourceType]providers.History
}
func (m *manager) GetRecommender(recommenderName string) recommender.Recommender {
m.lock.Lock()
defer m.lock.Unlock()
return m.recommenders[recommenderName]
}
func Run(ctx *framework.RecommendationContext, recommender recommender.Recommender) error {
//// If context is canceled, directly return.
//if ctx.Canceled() {
// klog.Infof("Recommender %q has been cancelled...", recommender.Name())
// return nil
//}
// 1. Filter phase
err := recommender.Filter(ctx)
if err != nil {
klog.Errorf("recommender %q failed at filter phase!", recommender.Name())
return err
}
// 2. PrePrepare phase
err = recommender.CheckDataProviders(ctx)
if err != nil {
klog.Errorf("recommender %q failed at prepare check data provider phase!", recommender.Name())
return err
}
// 3. Prepare phase
err = recommender.CollectData(ctx)
if err != nil {
klog.Errorf("recommender %q failed at prepare collect data phase!", recommender.Name())
return err
}
// 4. PostPrepare phase
err = recommender.PostProcessing(ctx)
if err != nil {
klog.Errorf("recommender %q failed at prepare data post processing phase!", recommender.Name())
return err
}
// 5. PreRecommend phase
err = recommender.PreRecommend(ctx)
if err != nil {
klog.Errorf("recommender %q failed at pre commend phase!", recommender.Name())
return err
}
// 6. Recommend phase
err = recommender.Recommend(ctx)
if err != nil {
klog.Errorf("recommender %q failed at recommend phase!", recommender.Name())
return err
}
// 7. PostRecommend phase, add policy
err = recommender.Policy(ctx)
if err != nil {
klog.Errorf("recommender %q failed at recommend policy phase!", recommender.Name())
return err
}
// 8. Observe phase
err = recommender.Observe(ctx)
if err != nil {
klog.Errorf("recommender %q failed at observe phase!", recommender.Name())
return err
}
return nil
}
package recommendation
package recommender
const (
// ReplicasRecommender name
......
package recommendation
package recommender
import (
"github.com/gocrane/crane/pkg/recommendation/framework"
......@@ -6,11 +6,11 @@ import (
type Recommender interface {
Name() string
Run(ctx *framework.RecommendationContext)
framework.Filter
framework.PrePrepare
framework.Prepare
framework.PostPrepare
framework.PreRecommend
framework.Recommend
framework.PostRecommend
framework.Observe
......
......@@ -2,13 +2,19 @@ package replicas
import (
"fmt"
"reflect"
"github.com/gocrane/api/analysis/v1alpha1"
"github.com/gocrane/crane/pkg/controller/analytics"
"github.com/gocrane/crane/pkg/metricnaming"
"github.com/gocrane/crane/pkg/recommendation/framework"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"reflect"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/klog/v2"
)
// Filter out k8s resources that are not supported by the recommender.
func (rr *ReplicasRecommender) Filter(ctx *framework.RecommendationContext) error {
// 1. get object identity
identity := ctx.Identity
......@@ -17,14 +23,26 @@ func (rr *ReplicasRecommender) Filter(ctx *framework.RecommendationContext) erro
accepted := rr.Recommender.AcceptedResourceSelectors
// 3. if not support, abort the recommendation flow
supported := IdentityIsSupported(identity, accepted)
supported := IsIdentitySupported(identity, accepted)
if !supported {
return fmt.Errorf("recommender %s is failed at fliter, your kubernetes resource is not supported for recommender %s.", rr.Name(), rr.Name())
}
// 4. generate metric
resourceCpu := corev1.ResourceCPU
target := &corev1.ObjectReference{}
labelSelector := labels.SelectorFromSet(ctx.Identity.Labels)
caller := fmt.Sprintf(rr.Name(), klog.KObj(&ctx.RecommendationRule), ctx.RecommendationRule.UID)
metricNamer := metricnaming.ResourceToWorkloadMetricNamer(target, &resourceCpu, labelSelector, caller)
if err := metricNamer.Validate(); err != nil {
return err
}
ctx.MetricNamer = metricNamer
return nil
}
func IdentityIsSupported(identity analytics.ObjectIdentity, selectors []v1alpha1.ResourceSelector) bool {
// IsIdentitySupported check weather object identity fit resource selector.
func IsIdentitySupported(identity analytics.ObjectIdentity, selectors []v1alpha1.ResourceSelector) bool {
supported := false
for _, selector := range selectors {
......@@ -32,7 +50,7 @@ func IdentityIsSupported(identity analytics.ObjectIdentity, selectors []v1alpha1
Name: identity.Name,
APIVersion: identity.APIVersion,
Kind: identity.Kind,
LabelSelector: v1.LabelSelector{
LabelSelector: &metav1.LabelSelector{
MatchLabels: identity.Labels,
},
}
......
package replicas
import "github.com/gocrane/crane/pkg/recommendation/framework"
import (
"encoding/json"
"fmt"
jsonpatch "github.com/evanphx/json-patch"
"github.com/gocrane/crane/pkg/recommend/types"
"github.com/gocrane/crane/pkg/recommendation/framework"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/yaml"
)
// Observe enhance the observability.
func (rr *ReplicasRecommender) Observe(ctx *framework.RecommendationContext) error {
key := client.ObjectKey{
Name: ctx.Identity.Name,
Namespace: ctx.Identity.Namespace,
}
unstructed := &unstructured.Unstructured{}
unstructed.SetAPIVersion(ctx.Identity.APIVersion)
unstructed.SetKind(ctx.Identity.Kind)
err := ctx.Client.Get(ctx.Context, key, unstructed)
if err != nil {
return err
}
oldObject, found, err := unstructured.NestedMap(unstructed.Object, "spec")
if !found || err != nil {
return fmt.Errorf("get spec from unstructed object %s failed. ", klog.KObj(unstructed))
}
result := ctx.Recommendation.Status.RecommendedValue
proposedRecommendation := types.ProposedRecommendation{}
err = yaml.Unmarshal([]byte(result), proposedRecommendation)
if err != nil {
return fmt.Errorf("decode replicas value from context error %s. ", err)
}
err = unstructured.SetNestedField(unstructed.Object, proposedRecommendation.ReplicasRecommendation.Replicas, "spec", "replicas")
if err != nil {
return fmt.Errorf("set replicas to spec failed %s. ", err)
}
newObject, found, err := unstructured.NestedMap(unstructed.Object, "spec")
if !found || err != nil {
return fmt.Errorf("get spec from unstructed object %s failed. ", klog.KObj(unstructed))
}
oldBytes, err := json.Marshal(oldObject)
if err != nil {
return fmt.Errorf("encode error %s. ", err)
}
newBytes, err := json.Marshal(newObject)
if err != nil {
return fmt.Errorf("encode error %s. ", err)
}
newPatch, err := jsonpatch.CreateMergePatch(newBytes, oldBytes)
if err != nil {
return fmt.Errorf("create merge patch error %s. ", err)
}
oldPatch, err := jsonpatch.CreateMergePatch(oldBytes, newBytes)
if err != nil {
return fmt.Errorf("create merge patch error %s. ", err)
}
ctx.Recommendation.Status.RecommendedInfo = string(newPatch)
ctx.Recommendation.Status.CurrentInfo = string(oldPatch)
// TODO(qmhu) Create action type.
ctx.Recommendation.Status.Action = "Patch"
return nil
}
package replicas
import (
"fmt"
"strconv"
"time"
"github.com/gocrane/crane/pkg/providers"
"github.com/gocrane/crane/pkg/providers/prom"
"github.com/gocrane/crane/pkg/recommendation/config"
"github.com/gocrane/crane/pkg/recommendation/framework"
"strings"
"github.com/lithammer/fuzzysearch/fuzzy"
"k8s.io/klog/v2"
)
// CheckDataProviders in PrePrepare phase, will create data source provider via your recommendation config.
func (rr *ReplicasRecommender) CheckDataProviders(ctx *framework.RecommendationContext) error {
// 1. load data provider from recommendation config, override the default data source
configSet := rr.Recommender.Config
for name, value := range configSet {
switch strings.ToLower(name) {
case string(providers.PrometheusDataSource):
promConfig := providers.PromConfig{}
provider, err := prom.NewProvider()
}
// replicas recommender only need history data provider
// History data source
// metricserver can't collect history data
// default is prometheus, you can override the provider to grpc or override the prometheus config
// TODO(xieydd) Load cache data source provider if config is not changed.
configKeys := config.GetKeysOfMap(configSet)
promKeys := fuzzy.FindFold(string(providers.PrometheusDataSource), configKeys)
dataSourceKeys := fuzzy.FindFold(providers.DataSourceTypeKey, configKeys)
//grpcKeys := fuzzy.FindFold(string(providers.GrpcDataSource), configKeys)
dataSourceType := dataSourceKeys[0]
if dataSourceType != string(providers.PrometheusDataSource) {
return fmt.Errorf("in replicas recommender, only suppport prometheus history data source")
}
if len(promKeys) != 0 {
return fmt.Errorf("in replicas recommender, you need set prometheus config %v for history data provider", providers.PrometheusConfigKeys)
}
mustSetConfig := []string{"prometheus-address", "prometheus-auth-username", "prometheus-auth-password", "prometheus-auth-bearertoken"}
if config.SlicesContainSlice(promKeys, mustSetConfig) {
return fmt.Errorf("in replicas recommender, you need set prometheus config %v for history data provider", mustSetConfig)
}
timeOut := 3 * time.Minute
if value, ok := configSet["prometheus-timeout"]; ok {
timeOut, _ = time.ParseDuration(value)
}
aliveTime := 60 * time.Second
if value, ok := configSet["prometheus-keepalive"]; ok {
aliveTime, _ = time.ParseDuration(value)
}
concurrency := 10
if value, ok := configSet["prometheus-query-concurrency"]; ok {
concurrency, _ = strconv.Atoi(value)
}
maxPoints := 11000
if value, ok := configSet["prometheus-maxpoints"]; ok {
maxPoints, _ = strconv.Atoi(value)
}
promConfig := providers.PromConfig{
Address: configSet["prometheus-address"],
Timeout: timeOut,
KeepAlive: aliveTime,
InsecureSkipVerify: configSet["prometheus-insecure-skip-verify"] == "true",
Auth: providers.ClientAuth{
Username: configSet["prometheus-auth-username"],
Password: configSet["prometheus-auth-password"],
BearerToken: configSet["prometheus-auth-bearertoken"],
},
QueryConcurrency: concurrency,
BRateLimit: configSet["prometheus-bratelimit"] == "true",
MaxPointsLimitPerTimeSeries: maxPoints,
}
promDataProvider, err := prom.NewProvider(&promConfig)
if err != nil {
return err
}
ctx.DataProviders = map[providers.DataSourceType]providers.History{
providers.PrometheusDataSource: promDataProvider,
}
// if no data provider config set, use default history data provider
/*
if len(dataSourceKeys) > 0 {
switch dataSourceType {
case string(providers.PrometheusDataSource):
if len(promKeys) != 0 {
return fmt.Errorf("in replicas recommender, you need set prometheus config %v for history data provider", providers.PrometheusConfigKeys)
}
mustSetConfig := []string{"prometheus-address", "prometheus-auth-username", "prometheus-auth-password", "prometheus-auth-bearertoken"}
if recommendation.SlicesContainSlice(promKeys, mustSetConfig) {
return fmt.Errorf("in replicas recommender, you need set prometheus config %v for history data provider", mustSetConfig)
}
timeOut := 3 * time.Minute
if value, ok := configSet["prometheus-timeout"]; ok {
timeOut, _ = time.ParseDuration(value)
}
aliveTime := 60 * time.Second
if value, ok := configSet["prometheus-keepalive"]; ok {
aliveTime, _ = time.ParseDuration(value)
}
concurrency := 10
if value, ok := configSet["prometheus-query-concurrency"]; ok {
concurrency, _ = strconv.Atoi(value)
}
maxPoints := 11000
if value, ok := configSet["prometheus-maxpoints"]; ok {
maxPoints, _ = strconv.Atoi(value)
}
promConfig := providers.PromConfig{
Address: configSet["prometheus-address"],
Timeout: timeOut,
KeepAlive: aliveTime,
InsecureSkipVerify: configSet["prometheus-insecure-skip-verify"] == "true",
Auth: providers.ClientAuth{
Username: configSet["prometheus-auth-username"],
Password: configSet["prometheus-auth-password"],
BearerToken: configSet["prometheus-auth-bearertoken"],
},
QueryConcurrency: concurrency,
BRateLimit: configSet["prometheus-bratelimit"] == "true",
MaxPointsLimitPerTimeSeries: maxPoints,
}
promDataProvider, err := prom.NewProvider(&promConfig)
if err != nil {
return err
}
ctx.DataProviders = map[providers.DataSourceType]providers.Interface{
providers.PrometheusDataSource: promDataProvider,
}
case string(providers.GrpcDataSource):
// not support grpc yet
if len(grpcKeys) != 0 {
return fmt.Errorf("in replicas recommender, you need set grpc config %v for history data provider", providers.PrometheusConfigKeys)
}
timeOut := time.Minute
if value, ok := configSet["grpc-ds-timeout"]; ok {
timeOut, _ = time.ParseDuration(value)
}
address := "localhost:50051"
if value, ok := configSet["grpc-ds-address"]; ok {
address = value
}
grpcConfig := providers.GrpcConfig{
Address: address,
Timeout: timeOut,
}
grpcDataProvider := grpc.NewProvider(&grpcConfig)
ctx.DataProviders = map[providers.DataSourceType]providers.Interface{
providers.GrpcDataSource: grpcDataProvider,
}
default:
return fmt.Errorf("replicas recommender only support %v and %v provider", providers.PrometheusDataSource, providers.GrpcDataSource)
}
}
*/
// 2. if not set data provider, will use default
// do nothing
return nil
}
func (rr *ReplicasRecommender) CollectData(ctx *framework.RecommendationContext) error {
klog.V(4).Infof("%s CpuQuery %s RecommendationRule %s", rr.Name(), ctx.MetricNamer.BuildUniqueKey(), klog.KObj(&ctx.RecommendationRule))
timeNow := time.Now()
tsList, err := ctx.DataProviders[providers.PrometheusDataSource].QueryTimeSeries(ctx.MetricNamer, timeNow.Add(-time.Hour*24*7), timeNow, time.Minute)
if err != nil {
return fmt.Errorf("%s query historic metrics failed: %v ", rr.Name(), err)
}
if len(tsList) != 1 {
return fmt.Errorf("%s query historic metrics data is unexpected, List length is %d ", rr.Name(), len(tsList))
}
ctx.InputValues = tsList
return nil
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment