Unverified Commit 1ebf6458 authored by Amol Umbark's avatar Amol Umbark Committed by GitHub
Browse files

Alerts: Test Notifications in Rules

parent 80c96af5
Showing with 202 additions and 130 deletions
+202 -130
......@@ -304,12 +304,14 @@ func (aH *APIHandler) RegisterRoutes(router *mux.Router) {
router.HandleFunc("/api/v1/channels/{id}", AdminAccess(aH.deleteChannel)).Methods(http.MethodDelete)
router.HandleFunc("/api/v1/channels", EditAccess(aH.createChannel)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/testChannel", EditAccess(aH.testChannel)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/rules", ViewAccess(aH.listRules)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/rules/{id}", ViewAccess(aH.getRule)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/rules", EditAccess(aH.createRule)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/rules/{id}", EditAccess(aH.editRule)).Methods(http.MethodPut)
router.HandleFunc("/api/v1/rules/{id}", EditAccess(aH.deleteRule)).Methods(http.MethodDelete)
router.HandleFunc("/api/v1/rules/{id}", EditAccess(aH.patchRule)).Methods(http.MethodPatch)
router.HandleFunc("/api/v1/testRule", EditAccess(aH.testRule)).Methods(http.MethodPost)
router.HandleFunc("/api/v1/dashboards", ViewAccess(aH.getDashboards)).Methods(http.MethodGet)
router.HandleFunc("/api/v1/dashboards", EditAccess(aH.createDashboards)).Methods(http.MethodPost)
......@@ -771,6 +773,32 @@ func (aH *APIHandler) createDashboards(w http.ResponseWriter, r *http.Request) {
}
func (aH *APIHandler) testRule(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
body, err := ioutil.ReadAll(r.Body)
if err != nil {
zap.S().Errorf("Error in getting req body in test rule API\n", err)
respondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
alertCount, apiRrr := aH.ruleManager.TestNotification(ctx, string(body))
if apiRrr != nil {
respondError(w, apiRrr, nil)
return
}
response := map[string]interface{}{
"alertCount": alertCount,
"message": "notification sent",
}
aH.respond(w, response)
}
func (aH *APIHandler) deleteRule(w http.ResponseWriter, r *http.Request) {
id := mux.Vars(r)["id"]
......@@ -963,6 +991,7 @@ func (aH *APIHandler) createRule(w http.ResponseWriter, r *http.Request) {
aH.respond(w, "rule successfully added")
}
func (aH *APIHandler) queryRangeMetricsFromClickhouse(w http.ResponseWriter, r *http.Request) {
}
......
......@@ -23,6 +23,8 @@ const (
// AlertForStateMetricName is the metric name for 'for' state of alert.
alertForStateMetricName = "ALERTS_FOR_STATE"
TestAlertPostFix = "_TEST_ALERT"
)
type RuleType string
......
......@@ -18,6 +18,16 @@ import (
// this file contains api request and responses to be
// served over http
// newApiErrorInternal returns a new api error object of type internal
func newApiErrorInternal(err error) *model.ApiError {
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
// newApiErrorBadData returns a new api error object of bad request type
func newApiErrorBadData(err error) *model.ApiError {
return &model.ApiError{Typ: model.ErrorBadData, Err: err}
}
// PostableRule is used to create alerting rule from HTTP api
type PostableRule struct {
Alert string `yaml:"alert,omitempty" json:"alert,omitempty"`
......
......@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/google/uuid"
"sort"
"strconv"
"strings"
......@@ -19,6 +20,8 @@ import (
// opentracing "github.com/opentracing/opentracing-go"
am "go.signoz.io/query-service/integrations/alertManager"
"go.signoz.io/query-service/model"
"go.signoz.io/query-service/utils/labels"
)
// namespace for prom metrics
......@@ -38,7 +41,6 @@ func prepareTaskName(ruleId interface{}) string {
default:
return fmt.Sprintf("%v-groupname", ruleId)
}
}
// ManagerOptions bundles options for the Manager.
......@@ -382,6 +384,7 @@ func (m *Manager) prepareTask(acquireLock bool, r *PostableRule, taskName string
tr, err := NewThresholdRule(
ruleId,
r,
ThresholdRuleOpts{},
)
if err != nil {
......@@ -403,6 +406,7 @@ func (m *Manager) prepareTask(acquireLock bool, r *PostableRule, taskName string
ruleId,
r,
log.With(m.logger, "alert", r.Alert),
PromRuleOpts{},
)
if err != nil {
......@@ -683,3 +687,84 @@ func (m *Manager) PatchRule(ruleStr string, ruleId string) (*GettableRule, error
return &response, nil
}
// TestNotification prepares a dummy rule for given rule parameters and
// sends a test notification. returns alert count and error (if any)
func (m *Manager) TestNotification(ctx context.Context, ruleStr string) (int, *model.ApiError) {
parsedRule, errs := ParsePostableRule([]byte(ruleStr))
if len(errs) > 0 {
zap.S().Errorf("msg: failed to parse rule from request:", "\t error: ", errs)
return 0, newApiErrorBadData(errs[0])
}
var alertname = parsedRule.Alert
if alertname == "" {
// alertname is not mandatory for testing, so picking
// a random string here
alertname = uuid.New().String()
}
// append name to indicate this is test alert
parsedRule.Alert = fmt.Sprintf("%s%s", alertname, TestAlertPostFix)
var rule Rule
var err error
if parsedRule.RuleType == RuleTypeThreshold {
// add special labels for test alerts
parsedRule.Labels[labels.AlertAdditionalInfoLabel] = fmt.Sprintf("The rule threshold is set to %.4f, and the observed metric value is {{$value}}.", *parsedRule.RuleCondition.Target)
parsedRule.Annotations[labels.AlertSummaryLabel] = fmt.Sprintf("The rule threshold is set to %.4f, and the observed metric value is {{$value}}.", *parsedRule.RuleCondition.Target)
parsedRule.Labels[labels.RuleSourceLabel] = ""
parsedRule.Labels[labels.AlertRuleIdLabel] = ""
// create a threshold rule
rule, err = NewThresholdRule(
alertname,
parsedRule,
ThresholdRuleOpts{
SendUnmatched: true,
SendAlways: true,
},
)
if err != nil {
zap.S().Errorf("msg: failed to prepare a new threshold rule for test:", "\t error: ", err)
return 0, newApiErrorBadData(err)
}
} else if parsedRule.RuleType == RuleTypeProm {
// create promql rule
rule, err = NewPromRule(
alertname,
parsedRule,
log.With(m.logger, "alert", alertname),
PromRuleOpts{
SendAlways: true,
},
)
if err != nil {
zap.S().Errorf("msg: failed to prepare a new promql rule for test:", "\t error: ", err)
return 0, newApiErrorBadData(err)
}
} else {
return 0, newApiErrorBadData(fmt.Errorf("failed to derive ruletype with given information"))
}
// set timestamp to current utc time
ts := time.Now().UTC()
count, err := rule.Eval(ctx, ts, m.opts.Queriers)
if err != nil {
zap.S().Warn("msg:", "Evaluating rule failed", "\t rule:", rule, "\t err: ", err)
return 0, newApiErrorInternal(fmt.Errorf("rule evaluation failed"))
}
alertsFound := count.(int)
rule.SendAlerts(ctx, ts, 0, time.Duration(1*time.Minute), m.prepareNotifyFunc())
return alertsFound, nil
}
......@@ -18,6 +18,12 @@ import (
yaml "gopkg.in/yaml.v2"
)
type PromRuleOpts struct {
// SendAlways will send alert irresepective of resendDelay
// or other params
SendAlways bool
}
type PromRule struct {
id string
name string
......@@ -43,12 +49,14 @@ type PromRule struct {
active map[uint64]*Alert
logger log.Logger
opts PromRuleOpts
}
func NewPromRule(
id string,
postableRule *PostableRule,
logger log.Logger,
opts PromRuleOpts,
) (*PromRule, error) {
if postableRule.RuleCondition == nil {
......@@ -69,13 +77,20 @@ func NewPromRule(
health: HealthUnknown,
active: map[uint64]*Alert{},
logger: logger,
opts: opts,
}
if int64(p.evalWindow) == 0 {
p.evalWindow = 5 * time.Minute
}
query, err := p.getPqlQuery()
if err != nil {
// can not generate a valid prom QL query
return nil, err
}
zap.S().Info("msg:", "creating new alerting rule", "\t name:", p.name, "\t condition:", p.ruleCondition.String())
zap.S().Info("msg:", "creating new alerting rule", "\t name:", p.name, "\t condition:", p.ruleCondition.String(), "\t query:", query)
return &p, nil
}
......@@ -172,24 +187,6 @@ func (r *PromRule) sample(alert *Alert, ts time.Time) pql.Sample {
return s
}
// forStateSample returns the sample for ALERTS_FOR_STATE.
func (r *PromRule) forStateSample(alert *Alert, ts time.Time, v float64) pql.Sample {
lb := plabels.NewBuilder(r.labels)
alertLabels := alert.Labels.(plabels.Labels)
for _, l := range alertLabels {
lb.Set(l.Name, l.Value)
}
lb.Set(plabels.MetricName, alertForStateMetricName)
lb.Set(plabels.AlertName, r.name)
s := pql.Sample{
Metric: lb.Labels(),
Point: pql.Point{T: timestamp.FromTime(ts), V: v},
}
return s
}
// GetEvaluationDuration returns the time in seconds it took to evaluate the alerting rule.
func (r *PromRule) GetEvaluationDuration() time.Duration {
r.mtx.Lock()
......@@ -265,7 +262,7 @@ func (r *PromRule) ForEachActiveAlert(f func(*Alert)) {
func (r *PromRule) SendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) {
alerts := []*Alert{}
r.ForEachActiveAlert(func(alert *Alert) {
if alert.needsSending(ts, resendDelay) {
if r.opts.SendAlways || alert.needsSending(ts, resendDelay) {
alert.LastSentAt = ts
// Allow for two Eval or Alertmanager send failures.
delta := resendDelay
......@@ -289,7 +286,6 @@ func (r *PromRule) getPqlQuery() (string, error) {
if query == "" {
return query, fmt.Errorf("a promquery needs to be set for this rule to function")
}
if r.ruleCondition.Target != nil && r.ruleCondition.CompareOp != CompareOpNone {
query = fmt.Sprintf("%s %s %f", query, ResolveCompareOp(r.ruleCondition.CompareOp), *r.ruleCondition.Target)
return query, nil
......@@ -321,7 +317,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) (
defer r.mtx.Unlock()
resultFPs := map[uint64]struct{}{}
var vec pql.Vector
var alerts = make(map[uint64]*Alert, len(res))
for _, smpl := range res {
......@@ -358,6 +354,7 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) (
for _, l := range r.labels {
lb.Set(l.Name, expand(l.Value))
}
lb.Set(qslabels.AlertNameLabel, r.Name())
lb.Set(qslabels.AlertRuleIdLabel, r.ID())
lb.Set(qslabels.RuleSourceLabel, r.GeneratorURL())
......@@ -429,8 +426,8 @@ func (r *PromRule) Eval(ctx context.Context, ts time.Time, queriers *Queriers) (
}
r.health = HealthGood
r.lastError = err
return vec, nil
return len(r.active), nil
}
func (r *PromRule) String() string {
......
......@@ -6,7 +6,6 @@ import (
"github.com/go-kit/log"
opentracing "github.com/opentracing/opentracing-go"
plabels "github.com/prometheus/prometheus/pkg/labels"
pql "github.com/prometheus/prometheus/promql"
"go.uber.org/zap"
"sort"
"sync"
......@@ -313,7 +312,6 @@ func (g *PromRuleTask) CopyState(fromTask Task) error {
// Eval runs a single evaluation cycle in which all rules are evaluated sequentially.
func (g *PromRuleTask) Eval(ctx context.Context, ts time.Time) {
zap.S().Info("promql rule task:", g.name, "\t eval started at:", ts)
var samplesTotal float64
for i, rule := range g.rules {
if rule == nil {
continue
......@@ -336,7 +334,7 @@ func (g *PromRuleTask) Eval(ctx context.Context, ts time.Time) {
rule.SetEvaluationTimestamp(t)
}(time.Now())
data, err := rule.Eval(ctx, ts, g.opts.Queriers)
_, err := rule.Eval(ctx, ts, g.opts.Queriers)
if err != nil {
rule.SetHealth(HealthBad)
rule.SetLastError(err)
......@@ -350,21 +348,8 @@ func (g *PromRuleTask) Eval(ctx context.Context, ts time.Time) {
//}
return
}
vector := data.(pql.Vector)
samplesTotal += float64(len(vector))
rule.SendAlerts(ctx, ts, g.opts.ResendDelay, g.frequency, g.notify)
seriesReturned := make(map[string]plabels.Labels, len(g.seriesInPreviousEval[i]))
defer func() {
g.seriesInPreviousEval[i] = seriesReturned
}()
for _, s := range vector {
seriesReturned[s.Metric.String()] = s.Metric
}
}(i, rule)
}
}
......@@ -14,17 +14,15 @@ import (
// RuleTask holds a rule (with composite queries)
// and evaluates the rule at a given frequency
type RuleTask struct {
name string
file string
frequency time.Duration
rules []Rule
seriesInPreviousEval []map[string]labels.Labels // One per Rule.
staleSeries []labels.Labels
opts *ManagerOptions
mtx sync.Mutex
evaluationDuration time.Duration
evaluationTime time.Duration
lastEvaluation time.Time
name string
file string
frequency time.Duration
rules []Rule
opts *ManagerOptions
mtx sync.Mutex
evaluationDuration time.Duration
evaluationTime time.Duration
lastEvaluation time.Time
markStale bool
done chan struct{}
......@@ -46,16 +44,15 @@ func newRuleTask(name, file string, frequency time.Duration, rules []Rule, opts
zap.S().Info("msg:", "initiating a new rule task", "\t name:", name, "\t frequency:", frequency)
return &RuleTask{
name: name,
file: file,
pause: false,
frequency: frequency,
rules: rules,
opts: opts,
seriesInPreviousEval: make([]map[string]labels.Labels, len(rules)),
done: make(chan struct{}),
terminated: make(chan struct{}),
notify: notify,
name: name,
file: file,
pause: false,
frequency: frequency,
rules: rules,
opts: opts,
done: make(chan struct{}),
terminated: make(chan struct{}),
notify: notify,
}
}
......@@ -126,24 +123,6 @@ func (g *RuleTask) Run(ctx context.Context) {
tick := time.NewTicker(g.frequency)
defer tick.Stop()
// defer cleanup
defer func() {
if !g.markStale {
return
}
go func(now time.Time) {
for _, rule := range g.seriesInPreviousEval {
for _, r := range rule {
g.staleSeries = append(g.staleSeries, r)
}
}
// That can be garbage collected at this point.
g.seriesInPreviousEval = nil
}(time.Now())
}()
iter()
// let the group iterate and run
......@@ -285,17 +264,15 @@ func (g *RuleTask) CopyState(fromTask Task) error {
ruleMap[nameAndLabels] = append(l, fi)
}
for i, rule := range g.rules {
for _, rule := range g.rules {
nameAndLabels := nameAndLabels(rule)
indexes := ruleMap[nameAndLabels]
if len(indexes) == 0 {
continue
}
fi := indexes[0]
g.seriesInPreviousEval[i] = from.seriesInPreviousEval[fi]
ruleMap[nameAndLabels] = indexes[1:]
// todo(amol): support other rules too here
ar, ok := rule.(*ThresholdRule)
if !ok {
continue
......@@ -310,18 +287,6 @@ func (g *RuleTask) CopyState(fromTask Task) error {
}
}
// Handle deleted and unmatched duplicate rules.
// todo(amol): possibly not needed any more
g.staleSeries = from.staleSeries
for fi, fromRule := range from.rules {
nameAndLabels := nameAndLabels(fromRule)
l := ruleMap[nameAndLabels]
if len(l) != 0 {
for _, series := range from.seriesInPreviousEval[fi] {
g.staleSeries = append(g.staleSeries, series)
}
}
}
return nil
}
......@@ -330,7 +295,6 @@ func (g *RuleTask) Eval(ctx context.Context, ts time.Time) {
zap.S().Debugf("msg:", "rule task eval started", "\t name:", g.name, "\t start time:", ts)
var samplesTotal float64
for i, rule := range g.rules {
if rule == nil {
continue
......@@ -353,7 +317,7 @@ func (g *RuleTask) Eval(ctx context.Context, ts time.Time) {
rule.SetEvaluationTimestamp(t)
}(time.Now())
data, err := rule.Eval(ctx, ts, g.opts.Queriers)
_, err := rule.Eval(ctx, ts, g.opts.Queriers)
if err != nil {
rule.SetHealth(HealthBad)
rule.SetLastError(err)
......@@ -368,18 +332,8 @@ func (g *RuleTask) Eval(ctx context.Context, ts time.Time) {
return
}
vector := data.(Vector)
samplesTotal += float64(len(vector))
rule.SendAlerts(ctx, ts, g.opts.ResendDelay, g.frequency, g.notify)
seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i]))
for _, s := range vector {
seriesReturned[s.Metric.String()] = s.Metric
}
g.seriesInPreviousEval[i] = seriesReturned
}(i, rule)
}
}
......@@ -43,11 +43,25 @@ type ThresholdRule struct {
// map of active alerts
active map[uint64]*Alert
opts ThresholdRuleOpts
}
type ThresholdRuleOpts struct {
// sendUnmatched sends observed metric values
// even if they dont match the rule condition. this is
// useful in testing the rule
SendUnmatched bool
// sendAlways will send alert irresepective of resendDelay
// or other params
SendAlways bool
}
func NewThresholdRule(
id string,
p *PostableRule,
opts ThresholdRuleOpts,
) (*ThresholdRule, error) {
if p.RuleCondition == nil {
......@@ -67,6 +81,7 @@ func NewThresholdRule(
preferredChannels: p.PreferredChannels,
health: HealthUnknown,
active: map[uint64]*Alert{},
opts: opts,
}
if int64(t.evalWindow) == 0 {
......@@ -105,6 +120,14 @@ func (r *ThresholdRule) target() *float64 {
return r.ruleCondition.Target
}
func (r *ThresholdRule) targetVal() float64 {
if r.ruleCondition == nil || r.ruleCondition.Target == nil {
return 0
}
return *r.ruleCondition.Target
}
func (r *ThresholdRule) matchType() MatchType {
if r.ruleCondition == nil {
return AtleastOnce
......@@ -188,25 +211,7 @@ func (r *ThresholdRule) sample(alert *Alert, ts time.Time) Sample {
Metric: lb.Labels(),
Point: Point{T: timestamp.FromTime(ts), V: 1},
}
return s
}
// forStateSample returns the sample for ALERTS_FOR_STATE.
func (r *ThresholdRule) forStateSample(alert *Alert, ts time.Time, v float64) Sample {
lb := labels.NewBuilder(r.labels)
alertLabels := alert.Labels.(labels.Labels)
for _, l := range alertLabels {
lb.Set(l.Name, l.Value)
}
lb.Set(labels.MetricNameLabel, alertForStateMetricName)
lb.Set(labels.AlertNameLabel, r.name)
s := Sample{
Metric: lb.Labels(),
Point: Point{T: timestamp.FromTime(ts), V: v},
}
return s
}
......@@ -283,10 +288,10 @@ func (r *ThresholdRule) ForEachActiveAlert(f func(*Alert)) {
}
func (r *ThresholdRule) SendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) {
zap.S().Info("msg:", "initiating send alerts (if any)", "\t rule:", r.Name())
zap.S().Info("msg:", "sending alerts", "\t rule:", r.Name())
alerts := []*Alert{}
r.ForEachActiveAlert(func(alert *Alert) {
if alert.needsSending(ts, resendDelay) {
if r.opts.SendAlways || alert.needsSending(ts, resendDelay) {
alert.LastSentAt = ts
// Allow for two Eval or Alertmanager send failures.
delta := resendDelay
......@@ -483,8 +488,9 @@ func (r *ThresholdRule) runChQuery(ctx context.Context, db clickhouse.Conn, quer
zap.S().Debugf("ruleid:", r.ID(), "\t resultmap(potential alerts):", len(resultMap))
for _, sample := range resultMap {
// check alert rule condition before dumping results
if r.CheckCondition(sample.Point.V) {
// check alert rule condition before dumping results, if sendUnmatchedResults
// is set then add results irrespective of condition
if r.opts.SendUnmatched || r.CheckCondition(sample.Point.V) {
result = append(result, sample)
}
}
......@@ -549,7 +555,6 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie
defer r.mtx.Unlock()
resultFPs := map[uint64]struct{}{}
var vec Vector
var alerts = make(map[uint64]*Alert, len(res))
for _, smpl := range res {
......@@ -563,6 +568,7 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie
// who are not used to Go's templating system.
defs := "{{$labels := .Labels}}{{$value := .Value}}"
// utility function to apply go template on labels and annots
expand := func(text string) string {
tmpl := NewTemplateExpander(
......@@ -662,8 +668,8 @@ func (r *ThresholdRule) Eval(ctx context.Context, ts time.Time, queriers *Querie
}
r.health = HealthGood
r.lastError = err
return vec, nil
return len(r.active), nil
}
func (r *ThresholdRule) String() string {
......
......@@ -24,6 +24,10 @@ const (
AlertRuleIdLabel = "ruleId"
RuleSourceLabel = "ruleSource"
RuleThresholdLabel = "threshold"
AlertAdditionalInfoLabel = "additionalInfo"
AlertSummaryLabel = "summary"
)
// Label is a key/value pair of strings.
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment