Commit 4ee6d4b5 authored by Ankit Anand's avatar Ankit Anand
Browse files

added spansAggregate API implementation for clickhouse

parent a7836c26
No related merge requests found
Showing with 105 additions and 5 deletions
+105 -5
......@@ -614,6 +614,101 @@ func (r *ClickHouseReader) SearchSpansAggregate(ctx context.Context, queryParams
spanSearchAggregatesResponseItems := []model.SpanSearchAggregatesResponseItem{}
aggregation_query := ""
if queryParams.Dimension == "duration" {
switch queryParams.AggregationOption {
case "p50":
aggregation_query = " quantile(0.50)(durationNano) as value "
break
case "p95":
aggregation_query = " quantile(0.95)(durationNano) as value "
break
case "p99":
aggregation_query = " quantile(0.99)(durationNano) as value "
break
}
} else if queryParams.Dimension == "calls" {
aggregation_query = " count(*) as value "
}
query := fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, %s FROM %s WHERE timestamp >= ? AND timestamp <= ?", queryParams.StepSeconds/60, aggregation_query, r.indexTable)
args := []interface{}{strconv.FormatInt(queryParams.Start.UnixNano(), 10), strconv.FormatInt(queryParams.End.UnixNano(), 10)}
if len(queryParams.ServiceName) != 0 {
query = query + " AND serviceName = ?"
args = append(args, queryParams.ServiceName)
}
if len(queryParams.OperationName) != 0 {
query = query + " AND name = ?"
args = append(args, queryParams.OperationName)
}
if len(queryParams.Kind) != 0 {
query = query + " AND kind = ?"
args = append(args, queryParams.Kind)
}
if len(queryParams.MinDuration) != 0 {
query = query + " AND durationNano >= ?"
args = append(args, queryParams.MinDuration)
}
if len(queryParams.MaxDuration) != 0 {
query = query + " AND durationNano <= ?"
args = append(args, queryParams.MaxDuration)
}
for _, item := range queryParams.Tags {
if item.Key == "error" && item.Value == "true" {
query = query + " AND ( has(tags, 'error:true') OR statusCode>=500)"
continue
}
if item.Operator == "equals" {
query = query + " AND has(tags, ?)"
args = append(args, fmt.Sprintf("%s:%s", item.Key, item.Value))
} else if item.Operator == "contains" {
query = query + " AND tagsValues[indexOf(tagsKeys, ?)] ILIKE ?"
args = append(args, item.Key)
args = append(args, fmt.Sprintf("%%%s%%", item.Value))
} else if item.Operator == "isnotnull" {
query = query + " AND has(tagsKeys, ?)"
args = append(args, item.Key)
} else {
return nil, fmt.Errorf("Tag Operator %s not supported", item.Operator)
}
}
query = query + " GROUP BY time ORDER BY time"
err := r.db.Select(&spanSearchAggregatesResponseItems, query, args...)
zap.S().Info(query)
if err != nil {
zap.S().Debug("Error in processing sql query: ", err)
return nil, fmt.Errorf("Error in processing sql query")
}
for i, _ := range spanSearchAggregatesResponseItems {
timeObj, _ := time.Parse(time.RFC3339Nano, spanSearchAggregatesResponseItems[i].Time)
spanSearchAggregatesResponseItems[i].Timestamp = int64(timeObj.UnixNano())
spanSearchAggregatesResponseItems[i].Time = ""
if queryParams.AggregationOption == "rate_per_sec" {
spanSearchAggregatesResponseItems[i].Value = float32(spanSearchAggregatesResponseItems[i].Value) / float32(queryParams.StepSeconds)
}
}
return spanSearchAggregatesResponseItems, nil
}
......@@ -16,7 +16,7 @@ var allowedDimesions = []string{"calls", "duration"}
var allowedAggregations = map[string][]string{
"calls": []string{"count", "rate_per_sec"},
"duration": []string{"avg", "p50", "p90", "p99"},
"duration": []string{"avg", "p50", "p95", "p99"},
}
func parseGetTopEndpointsRequest(r *http.Request) (*model.GetTopEndpointsParams, error) {
......@@ -234,6 +234,8 @@ func parseSearchSpanAggregatesRequest(r *http.Request) (*model.SpanSearchAggrega
}
params := &model.SpanSearchAggregatesParams{
Start: startTime,
End: endTime,
Intervals: fmt.Sprintf("%s/%s", startTimeStr, endTimeStr),
GranOrigin: startTimeStr,
GranPeriod: granPeriod,
......
......@@ -240,10 +240,10 @@ func SearchSpansAggregate(client *godruid.Client, queryParams *model.SpanSearchA
postAggregationString := `{"type":"quantilesDoublesSketchToQuantile","name":"value","field":{"type":"fieldAccess","fieldName":"quantile_agg"},"fraction":0.5}`
postAggregation = godruid.PostAggRawJson(postAggregationString)
break
case "p90":
case "p95":
aggregationString := `{ "type": "quantilesDoublesSketch", "fieldName": "QuantileDuration", "name": "quantile_agg", "k": 128}`
aggregation = godruid.AggRawJson(aggregationString)
postAggregationString := `{"type":"quantilesDoublesSketchToQuantile","name":"value","field":{"type":"fieldAccess","fieldName":"quantile_agg"},"fraction":0.9}`
postAggregationString := `{"type":"quantilesDoublesSketchToQuantile","name":"value","field":{"type":"fieldAccess","fieldName":"quantile_agg"},"fraction":0.95}`
postAggregation = godruid.PostAggRawJson(postAggregationString)
break
......
......@@ -66,6 +66,8 @@ type SpanSearchAggregatesParams struct {
MinDuration string
MaxDuration string
Tags []TagQuery
Start *time.Time
End *time.Time
GranOrigin string
GranPeriod string
Intervals string
......
......@@ -164,6 +164,7 @@ type ServiceMapDependencyResponseItem struct {
}
type SpanSearchAggregatesResponseItem struct {
Timestamp int64 `json:"timestamp"`
Value float32 `json:"value"`
Timestamp int64 `json:"timestamp,omitempty" db:"timestamp" `
Time string `json:"time,omitempty" db:"time"`
Value float32 `json:"value,omitempty" db:"value"`
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment