Unverified Commit abdb63c3 authored by qmhu's avatar qmhu Committed by GitHub
Browse files

Merge pull request #390 from qmhu/external-metric

Prediction for external metrics
Showing with 568 additions and 224 deletions
+568 -224
......@@ -54,12 +54,11 @@ type MetricAdapter struct {
}
func (a *MetricAdapter) makeCustomMetricProvider(remoteAdapter *metricprovider.RemoteAdapter, client client.Client, recorder record.EventRecorder) provider.CustomMetricsProvider {
return metricprovider.NewCustomMetricProvider(client, remoteAdapter, recorder)
}
func (a *MetricAdapter) makeExternalMetricProvider(client client.Client, recorder record.EventRecorder, scaleClient scale.ScalesGetter, restMapper meta.RESTMapper) *metricprovider.ExternalMetricProvider {
return metricprovider.NewExternalMetricProvider(client, recorder, scaleClient, restMapper)
func (a *MetricAdapter) makeExternalMetricProvider(remoteAdapter *metricprovider.RemoteAdapter, client client.Client, recorder record.EventRecorder, scaleClient scale.ScalesGetter, restMapper meta.RESTMapper) *metricprovider.ExternalMetricProvider {
return metricprovider.NewExternalMetricProvider(client, remoteAdapter, recorder, scaleClient, restMapper)
}
func main() {
......@@ -144,7 +143,7 @@ func main() {
ctx := signals.SetupSignalHandler()
customMetricProvider := cmd.makeCustomMetricProvider(remoteAdapter, client, recorder)
externalMetricProvider := cmd.makeExternalMetricProvider(client, recorder, scaleClient, restMapper)
externalMetricProvider := cmd.makeExternalMetricProvider(remoteAdapter, client, recorder, scaleClient, restMapper)
cmd.WithCustomMetrics(customMetricProvider)
cmd.WithExternalMetrics(externalMetricProvider)
......
......@@ -11,20 +11,6 @@ spec:
insecureSkipTLSVerify: true
groupPriorityMinimum: 100
versionPriority: 100
---
apiVersion: apiregistration.k8s.io/v1
kind: APIService
metadata:
name: v1beta2.custom.metrics.k8s.io
spec:
service:
name: metric-adapter
namespace: crane-system
group: custom.metrics.k8s.io
version: v1beta2
insecureSkipTLSVerify: true
groupPriorityMinimum: 100
versionPriority: 200
---
apiVersion: apiregistration.k8s.io/v1
......
......@@ -10,7 +10,7 @@ spec:
name: php-apache
minReplicas: 1 # MinReplicas is the lower limit replicas to the scale target which the autoscaler can scale down to.
maxReplicas: 10 # MaxReplicas is the upper limit replicas to the scale target which the autoscaler can scale up to.
scaleStrategy: Auto # ScaleStrategy indicate the strategy to scaling target, value can be "Auto" and "Manual".
scaleStrategy: Auto # ScaleStrategy indicate the strategy to scaling target, value can be "Auto" and "Preview".
# Better to setting cron to fill the one complete time period such as one day, one week
# Below is one day cron scheduling, it
#100 -------- --------- ----------
......
......@@ -10,7 +10,7 @@ spec:
name: php-apache
minReplicas: 1 # MinReplicas is the lower limit replicas to the scale target which the autoscaler can scale down to.
maxReplicas: 10 # MaxReplicas is the upper limit replicas to the scale target which the autoscaler can scale up to.
scaleStrategy: Auto # ScaleStrategy indicate the strategy to scaling target, value can be "Auto" and "Manual".
scaleStrategy: Auto # ScaleStrategy indicate the strategy to scaling target, value can be "Auto" and "Preview".
# Better to setting cron to fill the one complete time period such as one day, one week
# Below is one day cron scheduling, it
#100 -------- --------- ----------
......
......@@ -10,7 +10,7 @@ spec:
name: php-apache
minReplicas: 1 # MinReplicas is the lower limit replicas to the scale target which the autoscaler can scale down to.
maxReplicas: 10 # MaxReplicas is the upper limit replicas to the scale target which the autoscaler can scale up to.
scaleStrategy: Auto # ScaleStrategy indicate the strategy to scaling target, value can be "Auto" and "Manual".
scaleStrategy: Auto # ScaleStrategy indicate the strategy to scaling target, value can be "Auto" and "Preview".
# Better to setting cron to fill the one complete time period such as one day, one week
# Below is one day cron scheduling, it
#100 -------- --------- ----------
......
......@@ -10,7 +10,7 @@ spec:
name: php-apache
minReplicas: 1 # MinReplicas is the lower limit replicas to the scale target which the autoscaler can scale down to.
maxReplicas: 10 # MaxReplicas is the upper limit replicas to the scale target which the autoscaler can scale up to.
scaleStrategy: Auto # ScaleStrategy indicate the strategy to scaling target, value can be "Auto" and "Manual".
scaleStrategy: Auto # ScaleStrategy indicate the strategy to scaling target, value can be "Auto" and "Preview".
# Metrics contains the specifications for which to use to calculate the desired replica count.
metrics:
- type: Resource
......@@ -27,4 +27,4 @@ spec:
algorithmType: dsp
dsp:
sampleInterval: "60s"
historyLength: "3d"
historyLength: "7d"
......@@ -4,7 +4,7 @@ go 1.17
require (
github.com/go-echarts/go-echarts/v2 v2.2.4
github.com/gocrane/api v0.4.1-0.20220520134105-09d430d903ac
github.com/gocrane/api v0.5.1-0.20220706040335-eaadbb4b99ed
github.com/google/cadvisor v0.39.2
github.com/mjibson/go-dsp v0.0.0-20180508042940-11479a337f12
github.com/prometheus/client_golang v1.11.0
......@@ -181,6 +181,7 @@ require (
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/tools v0.1.8 // indirect
google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect
google.golang.org/protobuf v1.27.1
)
replace (
......
......@@ -314,6 +314,8 @@ github.com/gocrane/api v0.4.1-0.20220507041258-d376db2b4ad4 h1:vGDg3G6y661KAlhjf
github.com/gocrane/api v0.4.1-0.20220507041258-d376db2b4ad4/go.mod h1:GxI+t9AW8+NsHkz2JkPBIJN//9eLUjTZl1ScYAbXMbk=
github.com/gocrane/api v0.4.1-0.20220520134105-09d430d903ac h1:lBKVVOA4del0Plj80PCE+nglxaJxaXanCv5N6a3laVY=
github.com/gocrane/api v0.4.1-0.20220520134105-09d430d903ac/go.mod h1:GxI+t9AW8+NsHkz2JkPBIJN//9eLUjTZl1ScYAbXMbk=
github.com/gocrane/api v0.5.1-0.20220706040335-eaadbb4b99ed h1:aARCU+Hs1ZKTqJFJT/4/or/iGR6qYwMcG99CGmBFJpg=
github.com/gocrane/api v0.5.1-0.20220706040335-eaadbb4b99ed/go.mod h1:GxI+t9AW8+NsHkz2JkPBIJN//9eLUjTZl1ScYAbXMbk=
github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
github.com/godbus/dbus/v5 v5.0.4 h1:9349emZab16e7zQvpmsbtjc18ykshndd8y2PG3sgJbA=
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
......
......@@ -78,17 +78,18 @@ func (c *EffectiveHPAController) Reconcile(ctx context.Context, req ctrl.Request
}
// reconcile prediction if enabled
var tsp *predictionapi.TimeSeriesPrediction
if utils.IsEHPAPredictionEnabled(ehpa) && utils.IsEHPAHasPredictionMetric(ehpa) {
prediction, err := c.ReconcilePredication(ctx, ehpa)
tsp, err = c.ReconcilePredication(ctx, ehpa)
if err != nil {
setCondition(newStatus, autoscalingapi.Ready, metav1.ConditionFalse, "FailedReconcilePrediction", err.Error())
c.UpdateStatus(ctx, ehpa, newStatus)
return ctrl.Result{}, err
}
setPredictionCondition(newStatus, prediction.Status.Conditions)
setPredictionCondition(newStatus, tsp.Status.Conditions)
}
hpa, err := c.ReconcileHPA(ctx, ehpa, substitute, newStatus)
hpa, err := c.ReconcileHPA(ctx, ehpa, substitute, tsp)
if err != nil {
setCondition(newStatus, autoscalingapi.Ready, metav1.ConditionFalse, "FailedReconcileHPA", err.Error())
c.UpdateStatus(ctx, ehpa, newStatus)
......
......@@ -16,13 +16,14 @@ import (
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
autoscalingapi "github.com/gocrane/api/autoscaling/v1alpha1"
predictionapi "github.com/gocrane/api/prediction/v1alpha1"
"github.com/gocrane/crane/pkg/known"
"github.com/gocrane/crane/pkg/metricprovider"
"github.com/gocrane/crane/pkg/utils"
)
func (c *EffectiveHPAController) ReconcileHPA(ctx context.Context, ehpa *autoscalingapi.EffectiveHorizontalPodAutoscaler, substitute *autoscalingapi.Substitute, status *autoscalingapi.EffectiveHorizontalPodAutoscalerStatus) (*autoscalingv2.HorizontalPodAutoscaler, error) {
func (c *EffectiveHPAController) ReconcileHPA(ctx context.Context, ehpa *autoscalingapi.EffectiveHorizontalPodAutoscaler, substitute *autoscalingapi.Substitute, tsp *predictionapi.TimeSeriesPrediction) (*autoscalingv2.HorizontalPodAutoscaler, error) {
hpaList := &autoscalingv2.HorizontalPodAutoscalerList{}
opts := []client.ListOption{
client.MatchingLabels(map[string]string{known.EffectiveHorizontalPodAutoscalerUidLabel: string(ehpa.UID)}),
......@@ -30,17 +31,17 @@ func (c *EffectiveHPAController) ReconcileHPA(ctx context.Context, ehpa *autosca
err := c.Client.List(ctx, hpaList, opts...)
if err != nil {
if errors.IsNotFound(err) {
return c.CreateHPA(ctx, ehpa, substitute, status)
return c.CreateHPA(ctx, ehpa, substitute, tsp)
} else {
c.Recorder.Event(ehpa, v1.EventTypeNormal, "FailedGetHPA", err.Error())
klog.Error("Failed to get HPA, ehpa %s error %v", klog.KObj(ehpa), err)
return nil, err
}
} else if len(hpaList.Items) == 0 {
return c.CreateHPA(ctx, ehpa, substitute, status)
return c.CreateHPA(ctx, ehpa, substitute, tsp)
}
return c.UpdateHPAIfNeed(ctx, ehpa, &hpaList.Items[0], substitute, status)
return c.UpdateHPAIfNeed(ctx, ehpa, &hpaList.Items[0], substitute, tsp)
}
func (c *EffectiveHPAController) GetHPA(ctx context.Context, ehpa *autoscalingapi.EffectiveHorizontalPodAutoscaler) (*autoscalingv2.HorizontalPodAutoscaler, error) {
......@@ -58,8 +59,8 @@ func (c *EffectiveHPAController) GetHPA(ctx context.Context, ehpa *autoscalingap
return &hpaList.Items[0], nil
}
func (c *EffectiveHPAController) CreateHPA(ctx context.Context, ehpa *autoscalingapi.EffectiveHorizontalPodAutoscaler, substitute *autoscalingapi.Substitute, status *autoscalingapi.EffectiveHorizontalPodAutoscalerStatus) (*autoscalingv2.HorizontalPodAutoscaler, error) {
hpa, err := c.NewHPAObject(ctx, ehpa, substitute, status)
func (c *EffectiveHPAController) CreateHPA(ctx context.Context, ehpa *autoscalingapi.EffectiveHorizontalPodAutoscaler, substitute *autoscalingapi.Substitute, tsp *predictionapi.TimeSeriesPrediction) (*autoscalingv2.HorizontalPodAutoscaler, error) {
hpa, err := c.NewHPAObject(ctx, ehpa, substitute, tsp)
if err != nil {
c.Recorder.Event(ehpa, v1.EventTypeNormal, "FailedCreateHPAObject", err.Error())
klog.Errorf("Failed to create object, HorizontalPodAutoscaler %s error %v", klog.KObj(hpa), err)
......@@ -79,8 +80,8 @@ func (c *EffectiveHPAController) CreateHPA(ctx context.Context, ehpa *autoscalin
return hpa, nil
}
func (c *EffectiveHPAController) NewHPAObject(ctx context.Context, ehpa *autoscalingapi.EffectiveHorizontalPodAutoscaler, substitute *autoscalingapi.Substitute, status *autoscalingapi.EffectiveHorizontalPodAutoscalerStatus) (*autoscalingv2.HorizontalPodAutoscaler, error) {
metrics, err := c.GetHPAMetrics(ctx, ehpa, status)
func (c *EffectiveHPAController) NewHPAObject(ctx context.Context, ehpa *autoscalingapi.EffectiveHorizontalPodAutoscaler, substitute *autoscalingapi.Substitute, tsp *predictionapi.TimeSeriesPrediction) (*autoscalingv2.HorizontalPodAutoscaler, error) {
metrics, err := c.GetHPAMetrics(ctx, ehpa, tsp)
if err != nil {
return nil, err
}
......@@ -135,9 +136,9 @@ func (c *EffectiveHPAController) NewHPAObject(ctx context.Context, ehpa *autosca
return hpa, nil
}
func (c *EffectiveHPAController) UpdateHPAIfNeed(ctx context.Context, ehpa *autoscalingapi.EffectiveHorizontalPodAutoscaler, hpaExist *autoscalingv2.HorizontalPodAutoscaler, substitute *autoscalingapi.Substitute, status *autoscalingapi.EffectiveHorizontalPodAutoscalerStatus) (*autoscalingv2.HorizontalPodAutoscaler, error) {
func (c *EffectiveHPAController) UpdateHPAIfNeed(ctx context.Context, ehpa *autoscalingapi.EffectiveHorizontalPodAutoscaler, hpaExist *autoscalingv2.HorizontalPodAutoscaler, substitute *autoscalingapi.Substitute, tsp *predictionapi.TimeSeriesPrediction) (*autoscalingv2.HorizontalPodAutoscaler, error) {
var needUpdate bool
hpa, err := c.NewHPAObject(ctx, ehpa, substitute, status)
hpa, err := c.NewHPAObject(ctx, ehpa, substitute, tsp)
if err != nil {
c.Recorder.Event(ehpa, v1.EventTypeNormal, "FailedCreateHPAObject", err.Error())
klog.Errorf("Failed to create object, HorizontalPodAutoscaler %s error %v", klog.KObj(hpa), err)
......@@ -173,15 +174,15 @@ func (c *EffectiveHPAController) UpdateHPAIfNeed(ctx context.Context, ehpa *auto
}
// GetHPAMetrics loop metricSpec in EffectiveHorizontalPodAutoscaler and generate metricSpec for HPA
func (c *EffectiveHPAController) GetHPAMetrics(ctx context.Context, ehpa *autoscalingapi.EffectiveHorizontalPodAutoscaler, status *autoscalingapi.EffectiveHorizontalPodAutoscalerStatus) ([]autoscalingv2.MetricSpec, error) {
func (c *EffectiveHPAController) GetHPAMetrics(ctx context.Context, ehpa *autoscalingapi.EffectiveHorizontalPodAutoscaler, tsp *predictionapi.TimeSeriesPrediction) ([]autoscalingv2.MetricSpec, error) {
var metrics []autoscalingv2.MetricSpec
for _, metric := range ehpa.Spec.Metrics {
copyMetric := metric.DeepCopy()
metrics = append(metrics, *copyMetric)
}
if utils.IsEHPAPredictionEnabled(ehpa) && isPredictionReady(status) {
var customMetricsForPrediction []autoscalingv2.MetricSpec
if utils.IsEHPAPredictionEnabled(ehpa) {
var metricsForPrediction []autoscalingv2.MetricSpec
for _, metric := range metrics {
// generate a custom metric for resource metric
......@@ -191,6 +192,11 @@ func (c *EffectiveHPAController) GetHPAMetrics(ctx context.Context, ehpa *autosc
continue
}
if _, err := utils.GetReadyPredictionMetric(name, tsp); err != nil {
// metric is not predictable
continue
}
customMetric := &autoscalingv2.PodsMetricSource{
Metric: autoscalingv2.MetricIdentifier{
Name: name,
......@@ -240,11 +246,79 @@ func (c *EffectiveHPAController) GetHPAMetrics(ctx context.Context, ehpa *autosc
}
metricSpec := autoscalingv2.MetricSpec{Pods: customMetric, Type: autoscalingv2.PodsMetricSourceType}
customMetricsForPrediction = append(customMetricsForPrediction, metricSpec)
metricsForPrediction = append(metricsForPrediction, metricSpec)
}
// generate a external metric for external metric
if metric.Type == autoscalingv2.ExternalMetricSourceType {
name := utils.GetGeneralPredictionMetricName(metric.Type, false, metric.External.Metric.Name)
expressionQuery := utils.GetExpressionQuery(metric.External.Metric.Name, ehpa.Annotations)
if len(expressionQuery) == 0 {
continue
}
if _, err := utils.GetReadyPredictionMetric(name, tsp); err != nil {
// metric is not predictable
continue
}
external := &autoscalingv2.ExternalMetricSource{
Metric: autoscalingv2.MetricIdentifier{
Name: name,
// add known.EffectiveHorizontalPodAutoscalerUidLabel=uid in metric.selector
// MetricAdapter use label selector to match the matching TimeSeriesPrediction to return metrics
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
known.EffectiveHorizontalPodAutoscalerUidLabel: string(ehpa.UID),
},
},
},
Target: autoscalingv2.MetricTarget{
Type: autoscalingv2.AverageValueMetricType,
AverageValue: metric.External.Target.AverageValue,
},
}
metricSpec := autoscalingv2.MetricSpec{External: external, Type: autoscalingv2.ExternalMetricSourceType}
metricsForPrediction = append(metricsForPrediction, metricSpec)
}
// generate a custom metric for pods metric
if metric.Type == autoscalingv2.PodsMetricSourceType {
name := utils.GetGeneralPredictionMetricName(metric.Type, false, metric.Pods.Metric.Name)
expressionQuery := utils.GetExpressionQuery(metric.Pods.Metric.Name, ehpa.Annotations)
if len(expressionQuery) == 0 {
continue
}
if _, err := utils.GetReadyPredictionMetric(name, tsp); err != nil {
// metric is not predictable
continue
}
podsMetric := &autoscalingv2.PodsMetricSource{
Metric: autoscalingv2.MetricIdentifier{
Name: name,
// add known.EffectiveHorizontalPodAutoscalerUidLabel=uid in metric.selector
// MetricAdapter use label selector to match the matching TimeSeriesPrediction to return metrics
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
known.EffectiveHorizontalPodAutoscalerUidLabel: string(ehpa.UID),
},
},
},
Target: autoscalingv2.MetricTarget{
Type: autoscalingv2.AverageValueMetricType,
AverageValue: metric.Pods.Target.AverageValue,
},
}
metricSpec := autoscalingv2.MetricSpec{Pods: podsMetric, Type: autoscalingv2.PodsMetricSourceType}
metricsForPrediction = append(metricsForPrediction, metricSpec)
}
}
metrics = append(metrics, customMetricsForPrediction...)
metrics = append(metrics, metricsForPrediction...)
}
// Construct cron external metrics for cron scale
......@@ -263,7 +337,7 @@ func GetCronMetricSpecsForHPA(ehpa *autoscalingapi.EffectiveHorizontalPodAutosca
Type: autoscalingv2.ExternalMetricSourceType,
External: &autoscalingv2.ExternalMetricSource{
Metric: autoscalingv2.MetricIdentifier{
Name: ehpa.Name,
Name: utils.GetGeneralPredictionMetricName(autoscalingv2.PodsMetricSourceType, true, ehpa.Name),
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
known.EffectiveHorizontalPodAutoscalerUidLabel: string(ehpa.UID),
......
......@@ -144,6 +144,35 @@ func (c *EffectiveHPAController) NewPredictionObject(ehpa *autoscalingapi.Effect
},
})
}
// get expressionQuery according to metric.Type
var expressionQuery string
var metricName string
switch metric.Type {
case autoscalingv2.ExternalMetricSourceType:
expressionQuery = utils.GetExpressionQuery(metric.External.Metric.Name, ehpa.Annotations)
metricName = metric.External.Metric.Name
case autoscalingv2.PodsMetricSourceType:
expressionQuery = utils.GetExpressionQuery(metric.Pods.Metric.Name, ehpa.Annotations)
metricName = metric.Pods.Metric.Name
}
if len(expressionQuery) == 0 {
continue
}
metricIdentifier := utils.GetGeneralPredictionMetricName(metric.Type, false, metricName)
predictionMetrics = append(predictionMetrics, predictionapi.PredictionMetric{
ResourceIdentifier: metricIdentifier,
Type: predictionapi.ExpressionQueryMetricType,
ExpressionQuery: &predictionapi.ExpressionQuery{
Expression: expressionQuery,
},
Algorithm: predictionapi.Algorithm{
AlgorithmType: ehpa.Spec.Prediction.PredictionAlgorithm.AlgorithmType,
DSP: ehpa.Spec.Prediction.PredictionAlgorithm.DSP,
Percentile: ehpa.Spec.Prediction.PredictionAlgorithm.Percentile,
},
})
}
prediction.Spec.PredictionMetrics = predictionMetrics
......@@ -164,13 +193,3 @@ func setPredictionCondition(status *autoscalingapi.EffectiveHorizontalPodAutosca
}
}
}
func isPredictionReady(status *autoscalingapi.EffectiveHorizontalPodAutoscalerStatus) bool {
for _, cond := range status.Conditions {
if cond.Type == string(autoscalingapi.PredictionReady) && cond.Status == metav1.ConditionTrue {
return true
}
}
return false
}
......@@ -5,13 +5,13 @@ import (
"encoding/json"
"fmt"
"sort"
"strings"
"time"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
......@@ -56,14 +56,8 @@ func (tc *Controller) syncPredictionStatus(ctx context.Context, tsPrediction *pr
predictionEnd := predictionStart.Add(time.Duration(tsPrediction.Spec.PredictionWindowSeconds) * time.Second * 2)
predictedData, err := tc.doPredict(tsPrediction, predictionStart, predictionEnd)
if err != nil {
tc.Recorder.Event(tsPrediction, v1.EventTypeWarning, "FailedPredict", err.Error())
klog.Errorf("Failed to doPredict, err: %v", err)
return ctrl.Result{RequeueAfter: tc.UpdatePeriod}, err
}
newStatus.PredictionMetrics = predictedData
if len(tsPrediction.Spec.PredictionMetrics) != len(predictedData) {
if len(tsPrediction.Spec.PredictionMetrics) != len(predictedData) || err != nil {
klog.V(4).Infof("DoPredict predict data is partial, predictedDataLen: %v, key: %v", len(predictedData), key)
setCondition(newStatus, predictionapi.TimeSeriesPredictionConditionReady, metav1.ConditionFalse, known.ReasonTimeSeriesPredictPartial, "not all metric predicted")
err = tc.UpdateStatus(ctx, tsPrediction, newStatus)
......@@ -74,29 +68,17 @@ func (tc *Controller) syncPredictionStatus(ctx context.Context, tsPrediction *pr
return ctrl.Result{RequeueAfter: tc.UpdatePeriod}, err
}
windowStart := predictionStart
windowEnd := predictionStart.Add(time.Duration(tsPrediction.Spec.PredictionWindowSeconds) * time.Second)
warnings := tc.isPredictionDataOutDated(windowStart, windowEnd, predictedData)
if len(warnings) > 0 {
klog.V(4).Infof("DoPredict predict data is partial, range: %v, key: %v", fmt.Sprintf("[%v, %v]", windowStart, windowEnd), key)
setCondition(newStatus, predictionapi.TimeSeriesPredictionConditionReady, metav1.ConditionFalse, known.ReasonTimeSeriesPredictPartial, strings.Join(warnings, ";"))
err = tc.UpdateStatus(ctx, tsPrediction, newStatus)
if err != nil {
// todo
return ctrl.Result{}, err
}
} else {
klog.V(4).Infof("DoPredict predict data is complete, range: %v, key: %v", fmt.Sprintf("[%v, %v]", windowStart, windowEnd), key)
// status.conditions.reason in body should be at least 1 chars long
setCondition(newStatus, predictionapi.TimeSeriesPredictionConditionReady, metav1.ConditionTrue, known.ReasonTimeSeriesPredictSucceed, "")
klog.V(4).Infof("DoPredict predict data is complete, range: %v, key: %v", fmt.Sprintf("[%v, %v]", windowStart, windowEnd), key)
// status.conditions.reason in body should be at least 1 chars long
setCondition(newStatus, predictionapi.TimeSeriesPredictionConditionReady, metav1.ConditionTrue, known.ReasonTimeSeriesPredictSucceed, "")
err = tc.UpdateStatus(ctx, tsPrediction, newStatus)
if err != nil {
// todo: update status failed, then add it again for update?
return ctrl.Result{}, err
}
return ctrl.Result{RequeueAfter: tc.UpdatePeriod}, nil
err = tc.UpdateStatus(ctx, tsPrediction, newStatus)
if err != nil {
// todo: update status failed, then add it again for update?
return ctrl.Result{}, err
}
return ctrl.Result{RequeueAfter: tc.UpdatePeriod}, nil
}
return ctrl.Result{RequeueAfter: tc.UpdatePeriod}, nil
}
......@@ -131,23 +113,29 @@ func (tc *Controller) doPredict(tsPrediction *predictionapi.TimeSeriesPrediction
if err != nil {
return nil, err
}
var errs []error
for _, metric := range tsPrediction.Spec.PredictionMetrics {
status := predictionapi.PredictionMetricStatus{ResourceIdentifier: metric.ResourceIdentifier, Ready: false}
predictor := tc.getPredictor(metric.Algorithm.AlgorithmType)
if predictor == nil {
return result, fmt.Errorf("do not support algorithm type %v for metric %v", metric.Algorithm.AlgorithmType, metric.ResourceIdentifier)
errs = append(errs, fmt.Errorf("do not support algorithm type %v for metric %v", metric.Algorithm.AlgorithmType, metric.ResourceIdentifier))
continue
}
internalConf := c.ConvertApiMetric2InternalConfig(&metric)
namer := c.GetMetricNamer(&metric)
err := predictor.WithQuery(namer, c.GetCaller(), *internalConf)
err = predictor.WithQuery(namer, c.GetCaller(), *internalConf)
if err != nil {
return result, err
errs = append(errs, err)
continue
}
var data []*common.TimeSeries
// percentile is ok for time series
data, err = predictor.QueryPredictedTimeSeries(context.TODO(), namer, start, end)
if err != nil {
return result, err
errs = append(errs, err)
continue
}
predictedData := CommonTimeSeries2ApiTimeSeries(data)
if klog.V(6).Enabled() {
......@@ -155,9 +143,32 @@ func (tc *Controller) doPredict(tsPrediction *predictionapi.TimeSeriesPrediction
dataBytes, err2 := json.Marshal(data)
klog.V(6).Infof("DoPredict predicted data details, key: %v, queryExpr: %v, apiData: %v, predictData: %v, errs: %+v", klog.KObj(tsPrediction), namer.BuildUniqueKey(), string(apiDataBytes), string(dataBytes), []error{err1, err2})
}
result = append(result, predictionapi.PredictionMetricStatus{ResourceIdentifier: metric.ResourceIdentifier, Prediction: predictedData})
status.Prediction = predictedData
// prediction data checking
if len(status.Prediction) == 0 {
err = fmt.Errorf("metric %v no predict data", status.ResourceIdentifier)
}
for i, ts := range status.Prediction {
if !IsWindowInSamples(start, end, ts.Samples) {
err = fmt.Errorf("metric %v, ts %v, predict data is outdated, labels: %+v", status.ResourceIdentifier, i, ts.Labels)
}
}
if err != nil {
errs = append(errs, err)
} else {
status.Ready = true
}
result = append(result, status)
}
if len(errs) != 0 {
err = utilerrors.NewAggregate(errs)
}
return result, nil
return result, err
}
func (tc *Controller) UpdateStatus(ctx context.Context, tsPrediction *predictionapi.TimeSeriesPrediction, newStatus *predictionapi.TimeSeriesPredictionStatus) error {
......
......@@ -7,5 +7,6 @@ const (
)
const (
EffectiveHorizontalPodAutoscalerCurrentMetricsAnnotation = "autoscaling.crane.io/effective-hpa-current-metrics"
EffectiveHorizontalPodAutoscalerCurrentMetricsAnnotation = "autoscaling.crane.io/effective-hpa-current-metrics"
EffectiveHorizontalPodAutoscalerExternalMetricsAnnotationPrefix = "metric-query.autoscaling.crane.io"
)
......@@ -5,8 +5,10 @@ import (
"fmt"
"math"
"strconv"
"strings"
"time"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
v1 "k8s.io/api/core/v1"
apiErrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
......@@ -20,6 +22,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/custom-metrics-apiserver/pkg/provider"
autoscalingapi "github.com/gocrane/api/autoscaling/v1alpha1"
predictionapi "github.com/gocrane/api/prediction/v1alpha1"
"github.com/gocrane/crane/pkg/known"
......@@ -53,7 +56,7 @@ func NewCustomMetricProvider(client client.Client, remoteAdapter *RemoteAdapter,
func (p *CustomMetricProvider) GetMetricByName(ctx context.Context, name types.NamespacedName, info provider.CustomMetricInfo, metricSelector labels.Selector) (*custom_metrics.MetricValue, error) {
klog.Info(fmt.Sprintf("Get metric by name for custom metric, GroupResource %s namespacedName %s metric %s metricSelector %s", info.GroupResource.String(), name.String(), info.Metric, metricSelector.String()))
if !IsLocalMetric(info) {
if !IsLocalCustomMetric(info, p.client) {
if p.remoteAdapter != nil {
return p.remoteAdapter.GetMetricByName(ctx, name, info, metricSelector)
} else {
......@@ -64,11 +67,11 @@ func (p *CustomMetricProvider) GetMetricByName(ctx context.Context, name types.N
return nil, apiErrors.NewServiceUnavailable("not supported")
}
// GetMetricBySelector fetches metric for pod resources, get predictive metric from giving selector
// GetMetricBySelector fetches metric for custom resources, get predictive metric from giving selector
func (p *CustomMetricProvider) GetMetricBySelector(ctx context.Context, namespace string, selector labels.Selector, info provider.CustomMetricInfo, metricSelector labels.Selector) (*custom_metrics.MetricValueList, error) {
klog.Info(fmt.Sprintf("Get metric by selector for custom metric, Info %v namespace %s selector %s metricSelector %s", info, namespace, selector.String(), metricSelector.String()))
if !IsLocalMetric(info) {
if !IsLocalCustomMetric(info, p.client) {
if p.remoteAdapter != nil {
return p.remoteAdapter.GetMetricBySelector(ctx, namespace, selector, info, metricSelector)
} else {
......@@ -76,109 +79,92 @@ func (p *CustomMetricProvider) GetMetricBySelector(ctx context.Context, namespac
}
}
var matchingMetrics []custom_metrics.MetricValue
prediction, err := p.GetPrediction(ctx, namespace, metricSelector)
if err != nil {
return nil, err
}
pods, err := p.GetPods(ctx, namespace, selector)
if err != nil {
return nil, err
}
availablePods := utils.GetAvailablePods(pods)
if len(availablePods) == 0 {
return nil, fmt.Errorf("failed to get available pods. ")
}
isPredicting := false
// check prediction is ongoing
if prediction.Status.Conditions != nil {
for _, condition := range prediction.Status.Conditions {
if condition.Type == string(predictionapi.TimeSeriesPredictionConditionReady) && condition.Status == metav1.ConditionTrue {
isPredicting = true
}
if strings.HasPrefix(info.Metric, "crane") {
var matchingMetrics []custom_metrics.MetricValue
prediction, err := GetPrediction(ctx, p.client, namespace, metricSelector)
if err != nil {
return nil, err
}
}
if !isPredicting {
return nil, fmt.Errorf("TimeSeriesPrediction is not ready. ")
}
var timeSeries *predictionapi.MetricTimeSeries
for _, metricStatus := range prediction.Status.PredictionMetrics {
if metricStatus.ResourceIdentifier == info.Metric && len(metricStatus.Prediction) == 1 {
timeSeries = metricStatus.Prediction[0]
pods, err := p.GetPods(ctx, namespace, selector)
if err != nil {
return nil, err
}
}
// check time series for current metric is empty
if timeSeries == nil {
return nil, fmt.Errorf("TimeSeries is empty, metric name %s", info.Metric)
}
// get the largest value from timeSeries
// use the largest value will bring up the scaling up and defer the scaling down
timestampStart := time.Now()
timestampEnd := timestampStart.Add(time.Duration(prediction.Spec.PredictionWindowSeconds) * time.Second)
largestMetricValue := &metricValue{}
hasValidSample := false
for _, v := range timeSeries.Samples {
// exclude values that not in time range
if v.Timestamp < timestampStart.Unix() || v.Timestamp > timestampEnd.Unix() {
continue
availablePods := utils.GetAvailablePods(pods)
if len(availablePods) == 0 {
return nil, fmt.Errorf("failed to get available pods. ")
}
valueFloat, err := strconv.ParseFloat(v.Value, 32)
timeSeries, err := utils.GetReadyPredictionMetric(info.Metric, prediction)
if err != nil {
return nil, fmt.Errorf("failed to parse value to float: %v ", err)
}
if valueFloat > largestMetricValue.value {
hasValidSample = true
largestMetricValue.value = valueFloat
largestMetricValue.timestamp = v.Timestamp
return nil, err
}
}
if !hasValidSample {
return nil, fmt.Errorf("TimeSeries is outdated, metric name %s", info.Metric)
}
// get the largest value from timeSeries
// use the largest value will bring up the scaling up and defer the scaling down
timestampStart := time.Now()
timestampEnd := timestampStart.Add(time.Duration(prediction.Spec.PredictionWindowSeconds) * time.Second)
largestMetricValue := &metricValue{}
hasValidSample := false
for _, v := range timeSeries.Samples {
// exclude values that not in time range
if v.Timestamp < timestampStart.Unix() || v.Timestamp > timestampEnd.Unix() {
continue
}
averageValue := int64(math.Round(largestMetricValue.value * 1000 / float64(len(availablePods))))
klog.Infof("Provide custom metric %s average value %f.", info.Metric, float64(averageValue)/1000)
for _, pod := range availablePods {
metric := custom_metrics.MetricValue{
DescribedObject: custom_metrics.ObjectReference{
APIVersion: "v1",
Kind: "Pod",
Name: pod.Name,
Namespace: namespace,
},
Metric: custom_metrics.MetricIdentifier{
Name: info.Metric,
},
Timestamp: metav1.Now(),
valueFloat, err := strconv.ParseFloat(v.Value, 32)
if err != nil {
return nil, fmt.Errorf("failed to parse value to float: %v ", err)
}
if valueFloat > largestMetricValue.value {
hasValidSample = true
largestMetricValue.value = valueFloat
largestMetricValue.timestamp = v.Timestamp
}
}
if info.Metric == known.MetricNamePodCpuUsage {
metric.Value = *resource.NewMilliQuantity(averageValue, resource.DecimalSI)
if !hasValidSample {
return nil, fmt.Errorf("TimeSeries is outdated, metric name %s", info.Metric)
}
matchingMetrics = append(matchingMetrics, metric)
if info.GroupResource.String() == "pods" {
averageValue := int64(math.Round(largestMetricValue.value * 1000 / float64(len(availablePods)))) // use available replicas for object metric
klog.Infof("Provide pod custom metric %s average value %f.", info.Metric, float64(averageValue)/1000)
for _, pod := range availablePods {
metric := custom_metrics.MetricValue{
DescribedObject: custom_metrics.ObjectReference{
APIVersion: "v1",
Kind: "Pod",
Name: pod.Name,
Namespace: namespace,
},
Metric: custom_metrics.MetricIdentifier{
Name: info.Metric,
},
Timestamp: metav1.Now(),
}
metric.Value = *resource.NewMilliQuantity(averageValue, resource.DecimalSI)
matchingMetrics = append(matchingMetrics, metric)
}
return &custom_metrics.MetricValueList{
Items: matchingMetrics,
}, nil
}
}
return &custom_metrics.MetricValueList{
Items: matchingMetrics,
}, nil
return nil, apiErrors.NewServiceUnavailable("metric not found")
}
// ListAllMetrics returns all available custom metrics.
func (p *CustomMetricProvider) ListAllMetrics() []provider.CustomMetricInfo {
klog.Info("List all custom metrics")
metricInfos := ListAllLocalMetrics()
metricInfos := ListAllLocalMetrics(p.client)
if p.remoteAdapter != nil {
metricInfos = append(metricInfos, p.remoteAdapter.ListAllMetrics()...)
......@@ -187,18 +173,52 @@ func (p *CustomMetricProvider) ListAllMetrics() []provider.CustomMetricInfo {
return metricInfos
}
func ListAllLocalMetrics() []provider.CustomMetricInfo {
return []provider.CustomMetricInfo{
{
GroupResource: schema.GroupResource{Group: "", Resource: "pods"},
Namespaced: true,
Metric: known.MetricNamePodCpuUsage,
},
func ListAllLocalMetrics(client client.Client) []provider.CustomMetricInfo {
var metricInfos []provider.CustomMetricInfo
metricInfos = append(metricInfos, provider.CustomMetricInfo{
GroupResource: schema.GroupResource{Group: "", Resource: "pods"},
Namespaced: true,
Metric: known.MetricNamePodCpuUsage,
})
var ehpaList autoscalingapi.EffectiveHorizontalPodAutoscalerList
err := client.List(context.TODO(), &ehpaList)
if err != nil {
klog.Errorf("Failed to list ehpa: %v", err)
return metricInfos
}
for _, ehpa := range ehpaList.Items {
for _, metric := range ehpa.Spec.Metrics {
if metric.Type == autoscalingv2.PodsMetricSourceType &&
metric.Pods != nil &&
metric.Pods.Metric.Selector != nil &&
metric.Pods.Metric.Selector.MatchLabels != nil {
if _, exist := metric.Pods.Metric.Selector.MatchLabels[known.EffectiveHorizontalPodAutoscalerUidLabel]; exist {
metricName := utils.GetGeneralPredictionMetricName(autoscalingv2.PodsMetricSourceType, false, metric.Pods.Metric.Name)
metricInfos = append(metricInfos, provider.CustomMetricInfo{Metric: metricName})
}
}
}
for _, metric := range ehpa.Spec.Metrics {
if metric.Type == autoscalingv2.ObjectMetricSourceType &&
metric.Object != nil &&
metric.Object.Metric.Selector != nil &&
metric.Object.Metric.Selector.MatchLabels != nil {
if _, exist := metric.Object.Metric.Selector.MatchLabels[known.EffectiveHorizontalPodAutoscalerUidLabel]; exist {
metricName := utils.GetGeneralPredictionMetricName(autoscalingv2.ObjectMetricSourceType, false, metric.Object.Metric.Name)
metricInfos = append(metricInfos, provider.CustomMetricInfo{Metric: metricName})
}
}
}
}
return metricInfos
}
func IsLocalMetric(metricInfo provider.CustomMetricInfo) bool {
for _, info := range ListAllLocalMetrics() {
func IsLocalCustomMetric(metricInfo provider.CustomMetricInfo, client client.Client) bool {
for _, info := range ListAllLocalMetrics(client) {
if info.Namespaced == metricInfo.Namespaced &&
info.Metric == metricInfo.Metric &&
info.GroupResource.String() == metricInfo.GroupResource.String() {
......@@ -209,7 +229,7 @@ func IsLocalMetric(metricInfo provider.CustomMetricInfo) bool {
return false
}
func (p *CustomMetricProvider) GetPrediction(ctx context.Context, namespace string, metricSelector labels.Selector) (*predictionapi.TimeSeriesPrediction, error) {
func GetPrediction(ctx context.Context, kubeclient client.Client, namespace string, metricSelector labels.Selector) (*predictionapi.TimeSeriesPrediction, error) {
labelSelector, err := labels.ConvertSelectorToLabelsMap(metricSelector.String())
if err != nil {
klog.Error(err, "Failed to convert metric selectors to labels")
......@@ -227,7 +247,7 @@ func (p *CustomMetricProvider) GetPrediction(ctx context.Context, namespace stri
matchingLabels,
client.InNamespace(namespace),
}
err = p.client.List(ctx, predictionList, opts...)
err = kubeclient.List(ctx, predictionList, opts...)
if err != nil {
return nil, fmt.Errorf("failed to get TimeSeriesPrediction when get custom metric ")
} else if len(predictionList.Items) != 1 {
......
......@@ -3,9 +3,11 @@ package metricprovider
import (
"context"
"fmt"
"strconv"
"strings"
"time"
apiErrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
......@@ -18,7 +20,6 @@ import (
"sigs.k8s.io/custom-metrics-apiserver/pkg/provider"
autoscalingapi "github.com/gocrane/api/autoscaling/v1alpha1"
"github.com/gocrane/crane/pkg/known"
"github.com/gocrane/crane/pkg/utils"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
......@@ -28,19 +29,21 @@ var _ provider.ExternalMetricsProvider = &ExternalMetricProvider{}
// ExternalMetricProvider implements ehpa external metric as external metric provider which now support cron metric
type ExternalMetricProvider struct {
client client.Client
recorder record.EventRecorder
scaler scale.ScalesGetter
restMapper meta.RESTMapper
client client.Client
remoteAdapter *RemoteAdapter
recorder record.EventRecorder
scaler scale.ScalesGetter
restMapper meta.RESTMapper
}
// NewExternalMetricProvider returns an instance of ExternalMetricProvider
func NewExternalMetricProvider(client client.Client, recorder record.EventRecorder, scaleClient scale.ScalesGetter, restMapper meta.RESTMapper) *ExternalMetricProvider {
func NewExternalMetricProvider(client client.Client, remoteAdapter *RemoteAdapter, recorder record.EventRecorder, scaleClient scale.ScalesGetter, restMapper meta.RESTMapper) *ExternalMetricProvider {
return &ExternalMetricProvider{
client: client,
recorder: recorder,
scaler: scaleClient,
restMapper: restMapper,
client: client,
remoteAdapter: remoteAdapter,
recorder: recorder,
scaler: scaleClient,
restMapper: restMapper,
}
}
......@@ -50,11 +53,83 @@ const (
DefaultCronTargetMetricValue int32 = 1
)
// GetExternalMetric each ehpa mapping to only one external cron metric. metric name is ehpa name
// GetExternalMetric get external metric according to metric type
func (p *ExternalMetricProvider) GetExternalMetric(ctx context.Context, namespace string, metricSelector labels.Selector, info provider.ExternalMetricInfo) (*external_metrics.ExternalMetricValueList, error) {
klog.Info(fmt.Sprintf("Get metric by selector for external metric, Info %v namespace %s metricSelector %s", info, namespace, metricSelector.String()))
if !IsLocalExternalMetric(info, p.client) {
if p.remoteAdapter != nil {
return p.remoteAdapter.GetExternalMetric(ctx, namespace, metricSelector, info)
} else {
return nil, apiErrors.NewServiceUnavailable("not supported")
}
}
if strings.HasPrefix(info.Metric, "crane_cron") {
return p.GetCronExternalMetrics(ctx, namespace, metricSelector, info)
}
if strings.HasPrefix(info.Metric, "crane") {
prediction, err := GetPrediction(ctx, p.client, namespace, metricSelector)
if err != nil {
return nil, err
}
timeSeries, err := utils.GetReadyPredictionMetric(info.Metric, prediction)
if err != nil {
return nil, err
}
// get the largest value from timeSeries
// use the largest value will bring up the scaling up and defer the scaling down
timestampStart := time.Now()
timestampEnd := timestampStart.Add(time.Duration(prediction.Spec.PredictionWindowSeconds) * time.Second)
largestMetricValue := &metricValue{}
hasValidSample := false
for _, v := range timeSeries.Samples {
// exclude values that not in time range
if v.Timestamp < timestampStart.Unix() || v.Timestamp > timestampEnd.Unix() {
continue
}
valueFloat, err := strconv.ParseFloat(v.Value, 32)
if err != nil {
return nil, fmt.Errorf("failed to parse value to float: %v ", err)
}
if valueFloat > largestMetricValue.value {
hasValidSample = true
largestMetricValue.value = valueFloat
largestMetricValue.timestamp = v.Timestamp
}
}
if !hasValidSample {
return nil, fmt.Errorf("TimeSeries is outdated, metric name %s", info.Metric)
}
klog.Infof("Provide external metric %s average value %f.", info.Metric, largestMetricValue.value)
return &external_metrics.ExternalMetricValueList{Items: []external_metrics.ExternalMetricValue{
{
MetricName: info.Metric,
Timestamp: metav1.Now(),
Value: *resource.NewQuantity(int64(largestMetricValue.value), resource.DecimalSI),
},
}}, nil
}
return nil, apiErrors.NewServiceUnavailable("metric not found")
}
// GetCronExternalMetrics get desired metric value from cron spec
func (p *ExternalMetricProvider) GetCronExternalMetrics(ctx context.Context, namespace string, metricSelector labels.Selector, info provider.ExternalMetricInfo) (*external_metrics.ExternalMetricValueList, error) {
klog.Infof("Get cron metric %s by selector", info.Metric)
var ehpa autoscalingapi.EffectiveHorizontalPodAutoscaler
err := p.client.Get(ctx, client.ObjectKey{Namespace: namespace, Name: info.Metric}, &ehpa)
ehpaName := strings.TrimPrefix(info.Metric, "crane_cron_")
err := p.client.Get(ctx, client.ObjectKey{Namespace: namespace, Name: ehpaName}, &ehpa)
if err != nil {
return &external_metrics.ExternalMetricValueList{}, err
}
......@@ -131,21 +206,55 @@ func (p *ExternalMetricProvider) GetExternalMetric(ctx context.Context, namespac
// ListAllExternalMetrics return external cron metrics
// Fetch metrics from cache directly to avoid the performance issue for apiserver when the metrics is large, because this api is called frequently.
func (p *ExternalMetricProvider) ListAllExternalMetrics() []provider.ExternalMetricInfo {
klog.Info("List all external metrics")
metricInfos := ListAllLocalExternalMetrics(p.client)
if p.remoteAdapter != nil {
metricInfos = append(metricInfos, p.remoteAdapter.ListAllExternalMetrics()...)
}
return metricInfos
}
func ListAllLocalExternalMetrics(client client.Client) []provider.ExternalMetricInfo {
var metricInfos []provider.ExternalMetricInfo
var ehpaList autoscalingapi.EffectiveHorizontalPodAutoscalerList
err := p.client.List(context.TODO(), &ehpaList)
err := client.List(context.TODO(), &ehpaList)
if err != nil {
klog.Errorf("Failed to list ehpa: %v", err)
return metricInfos
}
for _, ehpa := range ehpaList.Items {
if CronEnabled(&ehpa) {
metricInfos = append(metricInfos, provider.ExternalMetricInfo{Metric: ehpa.Name})
metricName := utils.GetGeneralPredictionMetricName(autoscalingv2.PodsMetricSourceType, true, ehpa.Name)
metricInfos = append(metricInfos, provider.ExternalMetricInfo{Metric: metricName})
}
for _, metric := range ehpa.Spec.Metrics {
if metric.Type == autoscalingv2.ExternalMetricSourceType &&
metric.External != nil &&
metric.External.Metric.Selector != nil &&
metric.External.Metric.Selector.MatchLabels != nil {
if _, exist := metric.External.Metric.Selector.MatchLabels[known.EffectiveHorizontalPodAutoscalerUidLabel]; exist {
metricName := utils.GetGeneralPredictionMetricName(autoscalingv2.ExternalMetricSourceType, false, ehpa.Name)
metricInfos = append(metricInfos, provider.ExternalMetricInfo{Metric: metricName})
}
}
}
}
return metricInfos
}
func IsLocalExternalMetric(metricInfo provider.ExternalMetricInfo, client client.Client) bool {
for _, info := range ListAllLocalExternalMetrics(client) {
if info.Metric == metricInfo.Metric {
return true
}
}
return false
}
func CronEnabled(ehpa *autoscalingapi.EffectiveHorizontalPodAutoscaler) bool {
return len(ehpa.Spec.Crons) > 0
}
......
......@@ -6,6 +6,7 @@ import (
"strings"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
......@@ -15,7 +16,10 @@ import (
"k8s.io/klog/v2"
"k8s.io/metrics/pkg/apis/custom_metrics"
"k8s.io/metrics/pkg/apis/custom_metrics/v1beta2"
"k8s.io/metrics/pkg/apis/external_metrics"
externalMetricsAPI "k8s.io/metrics/pkg/apis/external_metrics/v1beta1"
cmClient "k8s.io/metrics/pkg/client/custom_metrics"
emClient "k8s.io/metrics/pkg/client/external_metrics"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/custom-metrics-apiserver/pkg/provider"
......@@ -23,10 +27,11 @@ import (
)
type RemoteAdapter struct {
metricClient cmClient.CustomMetricsClient
apiVersionsGetter cmClient.AvailableAPIsGetter
discoveryClient discovery.CachedDiscoveryInterface
restMapper meta.RESTMapper
metricClient cmClient.CustomMetricsClient
externalMetricClient emClient.ExternalMetricsClient
apiVersionsGetter cmClient.AvailableAPIsGetter
discoveryClient discovery.CachedDiscoveryInterface
restMapper meta.RESTMapper
}
func NewRemoteAdapter(namespace string, name string, port int, config *rest.Config, client client.Client) (*RemoteAdapter, error) {
......@@ -46,25 +51,30 @@ func NewRemoteAdapter(namespace string, name string, port int, config *rest.Conf
// use actual rest mapper here
metricClient := cmClient.NewForConfig(metricConfig, client.RESTMapper(), apiVersionsGetter)
externalMetricsClient, err := emClient.NewForConfig(metricConfig)
if err != nil {
return nil, fmt.Errorf("failed to create external metrics client: %v", err)
}
return &RemoteAdapter{
metricClient: metricClient,
apiVersionsGetter: apiVersionsGetter,
discoveryClient: cachedClient,
restMapper: client.RESTMapper(),
metricClient: metricClient,
externalMetricClient: externalMetricsClient,
apiVersionsGetter: apiVersionsGetter,
discoveryClient: cachedClient,
restMapper: client.RESTMapper(),
}, err
}
// ListAllMetrics returns all available custom metrics.
func (p *RemoteAdapter) ListAllMetrics() []provider.CustomMetricInfo {
klog.Info("List all remote custom metrics")
func (r *RemoteAdapter) ListAllMetrics() []provider.CustomMetricInfo {
klog.Info("List all remote custom metrics in remote adapter")
version, err := p.apiVersionsGetter.PreferredVersion()
version, err := r.apiVersionsGetter.PreferredVersion()
if err != nil {
klog.Errorf("Failed to get preferred version: %v ", err)
return nil
}
resources, err := p.discoveryClient.ServerResourcesForGroupVersion(version.String())
resources, err := r.discoveryClient.ServerResourcesForGroupVersion(version.String())
if err != nil {
klog.Errorf("Failed to get resources for %s: %v", version.String(), err)
return nil
......@@ -88,25 +98,25 @@ func (p *RemoteAdapter) ListAllMetrics() []provider.CustomMetricInfo {
return metricInfos
}
// GetMetricByName get metric from remote adapter
func (p *RemoteAdapter) GetMetricByName(ctx context.Context, name types.NamespacedName, info provider.CustomMetricInfo, metricSelector labels.Selector) (*custom_metrics.MetricValue, error) {
klog.Info("Get remote metric by name")
// GetMetricByName get custom metric from remote adapter
func (r *RemoteAdapter) GetMetricByName(ctx context.Context, name types.NamespacedName, info provider.CustomMetricInfo, metricSelector labels.Selector) (*custom_metrics.MetricValue, error) {
klog.Infof("Get custom metric %s by name in remote adapter", info.Metric)
kind, err := utils.KindForResource(info.GroupResource.Resource, p.restMapper)
kind, err := utils.KindForResource(info.GroupResource.Resource, r.restMapper)
if err != nil {
return nil, fmt.Errorf("failed to get kind for resource %s: %v ", info.GroupResource.Resource, err)
}
var object *v1beta2.MetricValue
if info.Namespaced {
object, err = p.metricClient.NamespacedMetrics(name.Namespace).GetForObject(
object, err = r.metricClient.NamespacedMetrics(name.Namespace).GetForObject(
schema.GroupKind{Group: info.GroupResource.Group, Kind: kind},
name.Name,
info.Metric,
metricSelector,
)
} else {
object, err = p.metricClient.RootScopedMetrics().GetForObject(
object, err = r.metricClient.RootScopedMetrics().GetForObject(
schema.GroupKind{Group: info.GroupResource.Group, Kind: kind},
name.Name,
info.Metric,
......@@ -136,18 +146,18 @@ func (p *RemoteAdapter) GetMetricByName(ctx context.Context, name types.Namespac
}, nil
}
// GetMetricBySelector get metric from remote
func (p *RemoteAdapter) GetMetricBySelector(ctx context.Context, namespace string, selector labels.Selector, info provider.CustomMetricInfo, metricSelector labels.Selector) (*custom_metrics.MetricValueList, error) {
klog.Info("Get remote metric by selector")
// GetMetricBySelector get custom metric from remote
func (r *RemoteAdapter) GetMetricBySelector(ctx context.Context, namespace string, selector labels.Selector, info provider.CustomMetricInfo, metricSelector labels.Selector) (*custom_metrics.MetricValueList, error) {
klog.Infof("Get custom metric %s by selector in remote adapter", info.Metric)
kind, err := utils.KindForResource(info.GroupResource.Resource, p.restMapper)
kind, err := utils.KindForResource(info.GroupResource.Resource, r.restMapper)
if err != nil {
return nil, fmt.Errorf("failed to get kind for resource %s: %v ", info.GroupResource.Resource, err)
}
var objects *v1beta2.MetricValueList
if info.Namespaced {
objects, err = p.metricClient.NamespacedMetrics(namespace).GetForObjects(
objects, err = r.metricClient.NamespacedMetrics(namespace).GetForObjects(
schema.GroupKind{
Group: info.GroupResource.Group,
Kind: kind,
......@@ -157,7 +167,7 @@ func (p *RemoteAdapter) GetMetricBySelector(ctx context.Context, namespace strin
metricSelector,
)
} else {
objects, err = p.metricClient.RootScopedMetrics().GetForObjects(
objects, err = r.metricClient.RootScopedMetrics().GetForObjects(
schema.GroupKind{
Group: info.GroupResource.Group,
Kind: info.GroupResource.Resource,
......@@ -193,3 +203,46 @@ func (p *RemoteAdapter) GetMetricBySelector(ctx context.Context, namespace strin
Items: values,
}, nil
}
// GetExternalMetric get external metric from remote
func (r *RemoteAdapter) GetExternalMetric(ctx context.Context, namespace string, metricSelector labels.Selector, info provider.ExternalMetricInfo) (*external_metrics.ExternalMetricValueList, error) {
klog.Infof("Get external metric %s by selector in remote adapter", info.Metric)
metricList, err := r.externalMetricClient.NamespacedMetrics(namespace).List(info.Metric, metricSelector)
if err != nil {
return nil, fmt.Errorf("failed to get metrics for external metric %s/%s: %v", namespace, info.Metric, err)
}
returnList := &external_metrics.ExternalMetricValueList{
Items: make([]external_metrics.ExternalMetricValue, len(metricList.Items)),
}
for i, m := range metricList.Items {
returnList.Items[i] = external_metrics.ExternalMetricValue{
TypeMeta: metav1.TypeMeta{Kind: m.Kind, APIVersion: m.APIVersion},
MetricName: m.MetricName,
MetricLabels: m.MetricLabels,
Timestamp: m.Timestamp,
WindowSeconds: m.WindowSeconds,
Value: m.Value,
}
}
return returnList, nil
}
// ListAllExternalMetrics list all external metric from remote
func (r *RemoteAdapter) ListAllExternalMetrics() []provider.ExternalMetricInfo {
klog.Info("Get external metric by selector in remote adapter")
var externalMetricInfos []provider.ExternalMetricInfo
resources, err := r.discoveryClient.ServerResourcesForGroupVersion(externalMetricsAPI.SchemeGroupVersion.String())
if err != nil {
klog.Errorf("Failed to get external metric resources for %r: %v", externalMetricsAPI.SchemeGroupVersion, err)
return nil
}
for _, r := range resources.APIResources {
info := provider.ExternalMetricInfo{
Metric: r.Name,
}
externalMetricInfos = append(externalMetricInfos, info)
}
return externalMetricInfos
}
package utils
import (
"fmt"
"regexp"
"strings"
autoscalingv2 "k8s.io/api/autoscaling/v2beta2"
v1 "k8s.io/api/core/v1"
......@@ -23,6 +27,12 @@ func IsEHPAHasPredictionMetric(ehpa *autoscalingapi.EffectiveHorizontalPodAutosc
return true
}
}
for key := range ehpa.Annotations {
if strings.HasPrefix(key, known.EffectiveHorizontalPodAutoscalerExternalMetricsAnnotationPrefix) {
return true
}
}
return false
}
......@@ -31,11 +41,44 @@ func IsEHPACronEnabled(ehpa *autoscalingapi.EffectiveHorizontalPodAutoscaler) bo
}
// GetPredictionMetricName return metric name used by prediction
func GetPredictionMetricName(Name v1.ResourceName) string {
switch Name {
func GetPredictionMetricName(name v1.ResourceName) string {
switch name {
case v1.ResourceCPU:
return known.MetricNamePodCpuUsage
default:
return ""
}
}
// GetGeneralPredictionMetricName return metric name used by prediction
func GetGeneralPredictionMetricName(sourceType autoscalingv2.MetricSourceType, isCron bool, name string) string {
prefix := ""
switch sourceType {
case autoscalingv2.PodsMetricSourceType:
prefix = "custom.pods"
case autoscalingv2.ExternalMetricSourceType:
prefix = "external"
}
if isCron {
prefix = "cron"
}
return fmt.Sprintf("crane_%s_%s", prefix, name)
}
// GetExpressionQuery return metric query from annotation by metricName
func GetExpressionQuery(metricName string, annotations map[string]string) string {
for k, v := range annotations {
if strings.HasPrefix(k, known.EffectiveHorizontalPodAutoscalerExternalMetricsAnnotationPrefix) {
compileRegex := regexp.MustCompile(fmt.Sprintf("%s(.*)", known.EffectiveHorizontalPodAutoscalerExternalMetricsAnnotationPrefix))
matchArr := compileRegex.FindStringSubmatch(k)
if len(matchArr) == 2 && matchArr[1][1:] == metricName {
return v
}
}
}
return ""
}
......@@ -2,11 +2,13 @@ package utils
import (
"context"
"fmt"
"time"
"k8s.io/klog/v2"
"github.com/gocrane/api/analysis/v1alpha1"
predictionapi "github.com/gocrane/api/prediction/v1alpha1"
"github.com/gocrane/crane/pkg/common"
"github.com/gocrane/crane/pkg/metricnaming"
......@@ -42,3 +44,26 @@ func QueryPredictedValues(predictor prediction.Interface, caller string, pConfig
func QueryPredictedValuesOnce(recommendation *v1alpha1.Recommendation, predictor prediction.Interface, caller string, pConfig *config.Config, namer metricnaming.MetricNamer) ([]*common.TimeSeries, error) {
return predictor.QueryRealtimePredictedValuesOnce(context.TODO(), namer, *pConfig)
}
func GetReadyPredictionMetric(metric string, prediction *predictionapi.TimeSeriesPrediction) (*predictionapi.MetricTimeSeries, error) {
var targetMetricStatus *predictionapi.PredictionMetricStatus
for _, metricStatus := range prediction.Status.PredictionMetrics {
if metricStatus.ResourceIdentifier == metric && len(metricStatus.Prediction) == 1 {
targetMetricStatus = &metricStatus
}
}
if targetMetricStatus == nil {
return nil, fmt.Errorf("TimeSeries is empty, metric name %s", metric)
}
if !targetMetricStatus.Ready {
return nil, fmt.Errorf("TimeSeries is not ready, metric name %s", metric)
}
if len(targetMetricStatus.Prediction) != 1 {
return nil, fmt.Errorf("TimeSeries data length is unexpected: %d", len(targetMetricStatus.Prediction))
}
return targetMetricStatus.Prediction[0], nil
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment