Unverified Commit a7bb9dac authored by Qi's avatar Qi Committed by GitHub
Browse files

Merge pull request #40 from kitianFresh/feature/tsp-observability-by-collector

add observability for tsp by prometheus collector
parents 85563164 ee264987
Showing with 179 additions and 2 deletions
+179 -2
......@@ -7,8 +7,6 @@ import (
"os"
"strings"
"github.com/gocrane/crane/pkg/controller/noderesource"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
......@@ -26,10 +24,12 @@ import (
"github.com/gocrane/crane/cmd/craned/app/options"
"github.com/gocrane/crane/pkg/controller/analytics"
"github.com/gocrane/crane/pkg/controller/ehpa"
"github.com/gocrane/crane/pkg/controller/noderesource"
"github.com/gocrane/crane/pkg/controller/recommendation"
"github.com/gocrane/crane/pkg/controller/tsp"
"github.com/gocrane/crane/pkg/known"
"github.com/gocrane/crane/pkg/log"
"github.com/gocrane/crane/pkg/metrics"
"github.com/gocrane/crane/pkg/prediction"
"github.com/gocrane/crane/pkg/prediction/dsp"
"github.com/gocrane/crane/pkg/prediction/percentile"
......@@ -107,6 +107,9 @@ func Run(ctx context.Context, opts *options.Options) error {
initializationControllers(ctx, mgr, opts)
log.Logger().Info("Starting crane manager")
// initialization custom collector metrics
initializationMetricCollector(mgr)
if err := mgr.Start(ctx); err != nil {
log.Logger().Error(err, "problem running crane manager")
return err
......@@ -115,6 +118,11 @@ func Run(ctx context.Context, opts *options.Options) error {
return nil
}
func initializationMetricCollector(mgr ctrl.Manager) {
// register as prometheus metric collector
metrics.CustomCollectorRegister(metrics.NewTspMetricCollector(mgr.GetClient()))
}
func initializationWebhooks(mgr ctrl.Manager, opts *options.Options) {
log.Logger().Info(fmt.Sprintf("opts %v", opts))
......
......@@ -30,3 +30,7 @@ func init() {
// Register custom metrics with the global prometheus registry
metrics.Registry.MustRegister(HPAReplicas, EHPAReplicas)
}
func CustomCollectorRegister(collector ...prometheus.Collector) {
metrics.Registry.MustRegister(collector...)
}
package metrics
import (
"context"
"fmt"
"sort"
"strconv"
"strings"
"time"
"github.com/go-logr/logr"
"github.com/prometheus/client_golang/prometheus"
v1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
predictionapi "github.com/gocrane/api/prediction/v1alpha1"
)
type TspMetricCollector struct {
client.Client
Logger logr.Logger
resourceCpuMetric *prometheus.Desc
resourceMemMetric *prometheus.Desc
}
func NewTspMetricCollector(client client.Client) *TspMetricCollector {
return &TspMetricCollector{
Client: client,
resourceCpuMetric: prometheus.NewDesc(
prometheus.BuildFQName("crane", "prediction", "time_series_prediction_resource_cpu"),
"prediction resource cpu value for TimeSeriesPrediction",
[]string{"targetKind", "targetName", "targetNamespace", "resourceIdentifier", "type", "resourceQuery", "metricQuery", "expressionQuery", "algorithm", "aggregateKey"},
nil,
),
resourceMemMetric: prometheus.NewDesc(
prometheus.BuildFQName("crane", "prediction", "time_series_prediction_resource_memory"),
"prediction resource memory value for TimeSeriesPrediction",
[]string{"targetKind", "targetName", "targetNamespace", "resourceIdentifier", "type", "resourceQuery", "metricQuery", "expressionQuery", "algorithm", "aggregateKey"},
nil,
),
}
}
// Why Implement prometheus collector ?
// Because the time series prediction timestamp is future timestamp, this way can push timestamp to prometheus
// if use prometheus metric instrument by default, prometheus scrape will use its own scrape timestamp, so that the prediction time series maybe has wrong timestamps in prom.
func (c *TspMetricCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- c.resourceCpuMetric
ch <- c.resourceMemMetric
}
func (c *TspMetricCollector) Collect(ch chan<- prometheus.Metric) {
tspList := &predictionapi.TimeSeriesPredictionList{}
err := c.List(context.TODO(), tspList)
if err != nil {
c.Logger.Error(err, "Collect metrics failed")
return
}
for _, tsp := range tspList.Items {
metricList := c.getMetrics(&tsp)
for _, metric := range metricList {
ch <- metric
}
}
}
func (c *TspMetricCollector) getMetrics(tsp *predictionapi.TimeSeriesPrediction) []prometheus.Metric {
var ms []prometheus.Metric
pmMap := map[string]predictionapi.PredictionMetric{}
for _, pm := range tsp.Spec.PredictionMetrics {
pmMap[pm.ResourceIdentifier] = pm
}
//* collected metric "crane_prediction_time_series_prediction_metric" { label:<name:"aggregateKey" value:"nodes-mem#instance=192.168.56.166:9100" > label:<name:"algorithm" value:"percentile" > label:<name:"expressionQuery" value:"" > label:<name:"rawQuery" value:"sum(node_memory_MemTotal_bytes{} - node_memory_MemAvailable_bytes{}) by (instance)" > label:<name:"resourceIdentifier" value:"nodes-mem" > label:<name:"resourceQuery" value:"" > label:<name:"targetKind" value:"Node" > label:<name:"targetName" value:"192.168.56.166" > label:<name:"targetNamespace" value:"" > label:<name:"type" value:"RawQuery" > gauge:<value:1.82784510645e+06 > timestamp_ms:1639466220000 } was collected before with the same name and label values
for _, metricStatus := range tsp.Status.PredictionMetrics {
outputMetrics := c.computePredictionMetric(tsp, pmMap, metricStatus)
ms = append(ms, outputMetrics...)
}
return ms
}
func (c *TspMetricCollector) computePredictionMetric(tsp *predictionapi.TimeSeriesPrediction, pmMap map[string]predictionapi.PredictionMetric, status predictionapi.PredictionMetricStatus) []prometheus.Metric {
var ms []prometheus.Metric
now := time.Now().Unix()
metricConf := pmMap[status.ResourceIdentifier]
resourceQuery := ""
metricQuery := ""
expressionQuery := ""
if metricConf.ResourceQuery != nil {
resourceQuery = metricConf.ResourceQuery.String()
}
if metricConf.MetricQuery != nil {
metricQuery = metricSelectorToQueryExpr(metricConf.MetricQuery)
}
if metricConf.ExpressionQuery != nil {
expressionQuery = metricConf.ExpressionQuery.Expression
}
for _, data := range status.Prediction {
key := AggregateSignalKey(status.ResourceIdentifier, data.Labels)
labelValues := []string{
tsp.Spec.TargetRef.Kind,
tsp.Spec.TargetRef.Name,
tsp.Spec.TargetRef.Namespace,
status.ResourceIdentifier,
string(metricConf.Type),
resourceQuery,
metricQuery,
expressionQuery,
string(metricConf.Algorithm.AlgorithmType),
key,
}
samples := data.Samples
sort.Slice(samples, func(i, j int) bool {
return samples[i].Timestamp < samples[j].Timestamp
})
// just one timestamp point, because prometheus collector will hash the label values, same label values is not valid
for _, sample := range samples {
if sample.Timestamp >= now {
ts := time.Unix(sample.Timestamp, 0)
value, err := strconv.ParseFloat(sample.Value, 64)
if err != nil {
c.Logger.Error(err, "Failed to parse sample value", "value", value)
continue
}
// only collect resource query cpu or memory now.
if resourceQuery == v1.ResourceCPU.String() {
s := prometheus.NewMetricWithTimestamp(ts, prometheus.MustNewConstMetric(c.resourceCpuMetric, prometheus.GaugeValue, value, labelValues...))
ms = append(ms, s)
} else if resourceQuery == v1.ResourceMemory.String() {
s := prometheus.NewMetricWithTimestamp(ts, prometheus.MustNewConstMetric(c.resourceMemMetric, prometheus.GaugeValue, value, labelValues...))
ms = append(ms, s)
}
break
}
}
}
return ms
}
func AggregateSignalKey(id string, labels []predictionapi.Label) string {
labelSet := make([]string, 0, len(labels)+1)
for _, label := range labels {
labelSet = append(labelSet, label.Name+"="+label.Value)
}
sort.Strings(labelSet)
return id + "#" + strings.Join(labelSet, ",")
}
func metricSelectorToQueryExpr(m *predictionapi.MetricQuery) string {
conditions := make([]string, 0, len(m.QueryConditions))
for _, cond := range m.QueryConditions {
values := make([]string, 0, len(cond.Value))
for _, val := range cond.Value {
values = append(values, val)
}
sort.Strings(values)
conditions = append(conditions, fmt.Sprintf("%s%s[%s]", cond.Key, cond.Operator, strings.Join(values, ",")))
}
sort.Strings(conditions)
return fmt.Sprintf("%s{%s}", m.MetricName, strings.Join(conditions, ","))
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment