Commit 925c258e authored by Ankit Nayan's avatar Ankit Nayan
Browse files

chore: alerts WIP

parent 7e95837f
Showing with 372 additions and 16 deletions
+372 -16
......@@ -2,23 +2,41 @@ package clickhouseReader
import (
"context"
"errors"
"crypto/md5"
"encoding/json"
"flag"
"fmt"
"net"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
sd_config "github.com/prometheus/prometheus/discovery/config"
"github.com/pkg/errors"
_ "github.com/ClickHouse/clickhouse-go"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/jmoiron/sqlx"
"github.com/oklog/oklog/pkg/group"
"github.com/prometheus/client_golang/prometheus"
promModel "github.com/prometheus/common/model"
"github.com/prometheus/common/promlog"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
"github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/util/stats"
"github.com/prometheus/prometheus/util/strutil"
"github.com/prometheus/tsdb"
"go.signoz.io/query-service/constants"
"go.signoz.io/query-service/model"
......@@ -95,18 +113,226 @@ func NewReader() *ClickHouseReader {
remoteStorage := remote.NewStorage(log.With(logger, "component", "remote"), startTime, time.Duration(1*time.Minute))
filename := flag.String("config", "./config/prometheus.yml", "(prometheus config to read metrics)")
// conf, err := config.LoadFile(*filename)
// if err != nil {
// zap.S().Error("couldn't load configuration (--config.file=%q): %v", filename, err)
// }
// err = remoteStorage.ApplyConfig(conf)
// if err != nil {
// zap.S().Error("Error in remoteStorage.ApplyConfig: ", err)
// }
cfg := struct {
configFile string
localStoragePath string
notifier notifier.Options
notifierTimeout promModel.Duration
forGracePeriod promModel.Duration
outageTolerance promModel.Duration
resendDelay promModel.Duration
tsdb tsdb.Options
lookbackDelta promModel.Duration
webTimeout promModel.Duration
queryTimeout promModel.Duration
queryConcurrency int
queryMaxSamples int
RemoteFlushDeadline promModel.Duration
prometheusURL string
logLevel promlog.AllowedLevel
}{
notifier: notifier.Options{
Registerer: prometheus.DefaultRegisterer,
},
}
cfg.configFile = *flag.String("config", "./config/prometheus.yml", "(prometheus config to read metrics)")
flag.Parse()
conf, err := config.LoadFile(*filename)
// fanoutStorage := remoteStorage
fanoutStorage := storage.NewFanout(logger, remoteStorage)
localStorage := remoteStorage
cfg.notifier.QueueCapacity = 10000
cfg.notifierTimeout = promModel.Duration(time.Duration.Seconds(10))
notifier := notifier.NewManager(&cfg.notifier, log.With(logger, "component", "notifier"))
// notifier.ApplyConfig(conf)
ExternalURL, err := computeExternalURL("", "0.0.0.0:9090")
if err != nil {
zap.S().Error("couldn't load configuration (--config.file=%q): %v", filename, err)
fmt.Fprintln(os.Stderr, errors.Wrapf(err, "parse external URL %q", ExternalURL.String()))
os.Exit(2)
}
cfg.outageTolerance = promModel.Duration(time.Duration.Hours(1))
cfg.forGracePeriod = promModel.Duration(time.Duration.Minutes(10))
cfg.resendDelay = promModel.Duration(time.Duration.Minutes(1))
ruleManager := rules.NewManager(&rules.ManagerOptions{
Appendable: fanoutStorage,
TSDB: localStorage,
QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage),
NotifyFunc: sendAlerts(notifier, ExternalURL.String()),
Context: context.Background(),
ExternalURL: ExternalURL,
Registerer: prometheus.DefaultRegisterer,
Logger: log.With(logger, "component", "rule manager"),
OutageTolerance: time.Duration(cfg.outageTolerance),
ForGracePeriod: time.Duration(cfg.forGracePeriod),
ResendDelay: time.Duration(cfg.resendDelay),
})
ctxNotify, cancelNotify := context.WithCancel(context.Background())
discoveryManagerNotify := discovery.NewManager(ctxNotify, log.With(logger, "component", "discovery manager notify"), discovery.Name("notify"))
reloaders := []func(cfg *config.Config) error{
remoteStorage.ApplyConfig,
// The Scrape and notifier managers need to reload before the Discovery manager as
// they need to read the most updated config when receiving the new targets list.
notifier.ApplyConfig,
func(cfg *config.Config) error {
c := make(map[string]sd_config.ServiceDiscoveryConfig)
for _, v := range cfg.AlertingConfig.AlertmanagerConfigs {
// AlertmanagerConfigs doesn't hold an unique identifier so we use the config hash as the identifier.
b, err := json.Marshal(v)
if err != nil {
return err
}
c[fmt.Sprintf("%x", md5.Sum(b))] = v.ServiceDiscoveryConfig
}
return discoveryManagerNotify.ApplyConfig(c)
},
func(cfg *config.Config) error {
// Get all rule files matching the configuration oaths.
var files []string
for _, pat := range cfg.RuleFiles {
fs, err := filepath.Glob(pat)
if err != nil {
// The only error can be a bad pattern.
return fmt.Errorf("error retrieving rule files for %s: %s", pat, err)
}
files = append(files, fs...)
}
return ruleManager.Update(time.Duration(cfg.GlobalConfig.EvaluationInterval), files)
},
}
err = remoteStorage.ApplyConfig(conf)
if err != nil {
zap.S().Error("Error in remoteStorage.ApplyConfig: ", err)
// ruleManager.Run()
// defer ruleManager.Stop()
// err = discoveryManagerNotify.Run()
// if err != nil {
// zap.S().Error("Error in discoveryManagerNotify.Run()")
// }
// sync.Once is used to make sure we can close the channel at different execution stages(SIGTERM or when the config is loaded).
type closeOnce struct {
C chan struct{}
once sync.Once
Close func()
}
// Wait until the server is ready to handle reloading.
reloadReady := &closeOnce{
C: make(chan struct{}),
}
reloadReady.Close = func() {
reloadReady.once.Do(func() {
close(reloadReady.C)
})
}
var g group.Group
{
// Notify discovery manager.
g.Add(
func() error {
err := discoveryManagerNotify.Run()
level.Info(logger).Log("msg", "Notify discovery manager stopped")
return err
},
func(err error) {
level.Info(logger).Log("msg", "Stopping notify discovery manager...")
cancelNotify()
},
)
}
{
// Initial configuration loading.
cancel := make(chan struct{})
g.Add(
func() error {
// select {
// case <-dbOpen:
// break
// // In case a shutdown is initiated before the dbOpen is released
// case <-cancel:
// reloadReady.Close()
// return nil
// }
if err := reloadConfig(cfg.configFile, logger, reloaders...); err != nil {
return fmt.Errorf("error loading config from %q: %s", cfg.configFile, err)
}
reloadReady.Close()
<-cancel
return nil
},
func(err error) {
close(cancel)
},
)
}
{
// Rule manager.
// TODO(krasi) refactor ruleManager.Run() to be blocking to avoid using an extra blocking channel.
cancel := make(chan struct{})
g.Add(
func() error {
<-reloadReady.C
ruleManager.Run()
<-cancel
return nil
},
func(err error) {
ruleManager.Stop()
close(cancel)
},
)
}
{
// Notifier.
// Calling notifier.Stop() before ruleManager.Stop() will cause a panic if the ruleManager isn't running,
// so keep this interrupt after the ruleManager.Stop().
g.Add(
func() error {
// When the notifier manager receives a new targets list
// it needs to read a valid config for each job.
// It depends on the config being in sync with the discovery manager
// so we wait until the config is fully loaded.
<-reloadReady.C
notifier.Run(discoveryManagerNotify.SyncCh())
level.Info(logger).Log("msg", "Notifier manager stopped")
return nil
},
func(err error) {
notifier.Stop()
},
)
}
if err := g.Run(); err != nil {
level.Error(logger).Log("err", err)
os.Exit(1)
}
// notifier.Run(discoveryManagerNotify.SyncCh())
// defer notifier.Stop()
return &ClickHouseReader{
db: db,
operationsTable: options.primary.OperationsTable,
......@@ -117,6 +343,92 @@ func NewReader() *ClickHouseReader {
}
}
func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config) error) (err error) {
level.Info(logger).Log("msg", "Loading configuration file", "filename", filename)
conf, err := config.LoadFile(filename)
if err != nil {
return fmt.Errorf("couldn't load configuration (--config.file=%q): %v", filename, err)
}
failed := false
for _, rl := range rls {
if err := rl(conf); err != nil {
level.Error(logger).Log("msg", "Failed to apply configuration", "err", err)
failed = true
}
}
if failed {
return fmt.Errorf("one or more errors occurred while applying the new configuration (--config.file=%q)", filename)
}
level.Info(logger).Log("msg", "Completed loading of configuration file", "filename", filename)
return nil
}
func startsOrEndsWithQuote(s string) bool {
return strings.HasPrefix(s, "\"") || strings.HasPrefix(s, "'") ||
strings.HasSuffix(s, "\"") || strings.HasSuffix(s, "'")
}
// computeExternalURL computes a sanitized external URL from a raw input. It infers unset
// URL parts from the OS and the given listen address.
func computeExternalURL(u, listenAddr string) (*url.URL, error) {
if u == "" {
hostname, err := os.Hostname()
if err != nil {
return nil, err
}
_, port, err := net.SplitHostPort(listenAddr)
if err != nil {
return nil, err
}
u = fmt.Sprintf("http://%s:%s/", hostname, port)
}
if startsOrEndsWithQuote(u) {
return nil, fmt.Errorf("URL must not begin or end with quotes")
}
eu, err := url.Parse(u)
if err != nil {
return nil, err
}
ppref := strings.TrimRight(eu.Path, "/")
if ppref != "" && !strings.HasPrefix(ppref, "/") {
ppref = "/" + ppref
}
eu.Path = ppref
return eu, nil
}
// sendAlerts implements the rules.NotifyFunc for a Notifier.
func sendAlerts(n *notifier.Manager, externalURL string) rules.NotifyFunc {
return func(ctx context.Context, expr string, alerts ...*rules.Alert) {
var res []*notifier.Alert
for _, alert := range alerts {
a := &notifier.Alert{
StartsAt: alert.FiredAt,
Labels: alert.Labels,
Annotations: alert.Annotations,
GeneratorURL: externalURL + strutil.TableLinkForExpression(expr),
}
if !alert.ResolvedAt.IsZero() {
a.EndsAt = alert.ResolvedAt
} else {
a.EndsAt = alert.ValidUntil
}
res = append(res, a)
}
if len(alerts) > 0 {
n.Send(res...)
}
}
}
func initialize(options *Options) (*sqlx.DB, error) {
db, err := connect(options.getPrimary())
......@@ -135,6 +447,48 @@ func connect(cfg *namespaceConfig) (*sqlx.DB, error) {
return cfg.Connector(cfg)
}
func (r *ClickHouseReader) GetRules(localDB *sqlx.DB) (*model.RuleGroups, *model.ApiError) {
rules := []*model.RuleGroups{}
err := localDB.Select(&rules, "SELECT id, updated_at, data FROM rules")
if err != nil {
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
if len(rules) > 1 {
return nil, &model.ApiError{Typ: model.ErrorInternal, Err: fmt.Errorf("multiple rule entry detected from db")}
}
if len(rules) == 0 {
return nil, nil
}
return rules[0], nil
}
func (r *ClickHouseReader) SetRules(localDB *sqlx.DB, rule string) *model.ApiError {
rules, apiErrorObj := r.GetRules(localDB)
if apiErrorObj != nil {
return apiErrorObj
}
var dbQuery string
if rules != nil {
dbQuery = fmt.Sprintf("UPDATE rules SET updated_at='%s', data='%s' WHERE id='%d'", time.Now(), rule, rules.Id)
} else {
dbQuery = fmt.Sprintf("INSERT into rules (updated_at, data) VALUES ('%s', '%s')", time.Now(), rule)
}
_, err := localDB.Exec(dbQuery)
if err != nil {
return &model.ApiError{Typ: model.ErrorInternal, Err: err}
}
return nil
}
func (r *ClickHouseReader) GetInstantQueryMetricsResult(ctx context.Context, queryParams *model.InstantQueryMetricsParams) (*promql.Result, *stats.QueryStats, *model.ApiError) {
qry, err := r.queryEngine.NewInstantQuery(r.remoteStorage, queryParams.Query, queryParams.Time)
if err != nil {
......
......@@ -361,7 +361,12 @@ func (aH *APIHandler) getRules(w http.ResponseWriter, r *http.Request) {
return
}
aH.respond(w, rules.Data)
if rules != nil {
aH.respond(w, rules.Data)
return
}
aH.respond(w, "")
}
......@@ -369,20 +374,17 @@ func (aH *APIHandler) setRules(w http.ResponseWriter, r *http.Request) {
decoder := json.NewDecoder(r.Body)
type rulesRequest struct {
data string
}
var rules *rulesRequest
err := decoder.Decode(rules)
var postData map[string]string
err := decoder.Decode(&postData)
if err != nil {
aH.respondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
apiErrorObj := (*aH.reader).SetRules(aH.localDB, rules.data)
apiErrorObj := (*aH.reader).SetRules(aH.localDB, postData["data"])
if apiErrorObj.Err != nil {
if apiErrorObj != nil {
aH.respondError(w, apiErrorObj, nil)
return
}
......
......@@ -36,7 +36,7 @@ type QueryData struct {
type RuleGroups struct {
Id int `json:"id" db:"id"`
UpdatedAt time.Time `json:"created_at" db:"created_at"`
UpdatedAt time.Time `json:"updated_at" db:"updated_at"`
Data string `json:"data" db:"data"`
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment