Commit 13973f6a authored by Ankit Nayan's avatar Ankit Nayan
Browse files

chore: alerts WIP

parent 2e8e4b7b
No related merge requests found
Showing with 95 additions and 341 deletions
+95 -341
......@@ -2,40 +2,23 @@ package clickhouseReader
import (
"context"
"crypto/md5"
"encoding/json"
"errors"
"flag"
"fmt"
"net"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
"github.com/oklog/oklog/pkg/group"
"github.com/pkg/errors"
_ "github.com/ClickHouse/clickhouse-go"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/jmoiron/sqlx"
"github.com/prometheus/client_golang/prometheus"
promModel "github.com/prometheus/common/model"
"github.com/prometheus/common/promlog"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/discovery"
sd_config "github.com/prometheus/prometheus/discovery/config"
"github.com/prometheus/prometheus/notifier"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/rules"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/storage/remote"
"github.com/prometheus/prometheus/util/stats"
"github.com/prometheus/prometheus/util/strutil"
"github.com/prometheus/tsdb"
"go.signoz.io/query-service/constants"
"go.signoz.io/query-service/model"
......@@ -112,226 +95,18 @@ func NewReader() *ClickHouseReader {
remoteStorage := remote.NewStorage(log.With(logger, "component", "remote"), startTime, time.Duration(1*time.Minute))
// conf, err := config.LoadFile(*filename)
// if err != nil {
// zap.S().Error("couldn't load configuration (--config.file=%q): %v", filename, err)
// }
// err = remoteStorage.ApplyConfig(conf)
// if err != nil {
// zap.S().Error("Error in remoteStorage.ApplyConfig: ", err)
// }
cfg := struct {
configFile string
localStoragePath string
notifier notifier.Options
notifierTimeout promModel.Duration
forGracePeriod promModel.Duration
outageTolerance promModel.Duration
resendDelay promModel.Duration
tsdb tsdb.Options
lookbackDelta promModel.Duration
webTimeout promModel.Duration
queryTimeout promModel.Duration
queryConcurrency int
queryMaxSamples int
RemoteFlushDeadline promModel.Duration
prometheusURL string
logLevel promlog.AllowedLevel
}{
notifier: notifier.Options{
Registerer: prometheus.DefaultRegisterer,
},
}
cfg.configFile = *flag.String("config", "./config/prometheus.yml", "(prometheus config to read metrics)")
filename := flag.String("config", "./config/prometheus.yml", "(prometheus config to read metrics)")
flag.Parse()
// fanoutStorage := remoteStorage
fanoutStorage := storage.NewFanout(logger, remoteStorage)
localStorage := remoteStorage
cfg.notifier.QueueCapacity = 10000
cfg.notifierTimeout = promModel.Duration(time.Duration.Seconds(10))
notifier := notifier.NewManager(&cfg.notifier, log.With(logger, "component", "notifier"))
// notifier.ApplyConfig(conf)
ExternalURL, err := computeExternalURL("", "0.0.0.0:9090")
conf, err := config.LoadFile(*filename)
if err != nil {
fmt.Fprintln(os.Stderr, errors.Wrapf(err, "parse external URL %q", ExternalURL.String()))
os.Exit(2)
}
cfg.outageTolerance = promModel.Duration(time.Duration.Hours(1))
cfg.forGracePeriod = promModel.Duration(time.Duration.Minutes(10))
cfg.resendDelay = promModel.Duration(time.Duration.Minutes(1))
ruleManager := rules.NewManager(&rules.ManagerOptions{
Appendable: fanoutStorage,
TSDB: localStorage,
QueryFunc: rules.EngineQueryFunc(queryEngine, fanoutStorage),
NotifyFunc: sendAlerts(notifier, ExternalURL.String()),
Context: context.Background(),
ExternalURL: ExternalURL,
Registerer: prometheus.DefaultRegisterer,
Logger: log.With(logger, "component", "rule manager"),
OutageTolerance: time.Duration(cfg.outageTolerance),
ForGracePeriod: time.Duration(cfg.forGracePeriod),
ResendDelay: time.Duration(cfg.resendDelay),
})
ctxNotify, cancelNotify := context.WithCancel(context.Background())
discoveryManagerNotify := discovery.NewManager(ctxNotify, log.With(logger, "component", "discovery manager notify"), discovery.Name("notify"))
reloaders := []func(cfg *config.Config) error{
remoteStorage.ApplyConfig,
// The Scrape and notifier managers need to reload before the Discovery manager as
// they need to read the most updated config when receiving the new targets list.
notifier.ApplyConfig,
func(cfg *config.Config) error {
c := make(map[string]sd_config.ServiceDiscoveryConfig)
for _, v := range cfg.AlertingConfig.AlertmanagerConfigs {
// AlertmanagerConfigs doesn't hold an unique identifier so we use the config hash as the identifier.
b, err := json.Marshal(v)
if err != nil {
return err
}
c[fmt.Sprintf("%x", md5.Sum(b))] = v.ServiceDiscoveryConfig
}
return discoveryManagerNotify.ApplyConfig(c)
},
func(cfg *config.Config) error {
// Get all rule files matching the configuration oaths.
var files []string
for _, pat := range cfg.RuleFiles {
fs, err := filepath.Glob(pat)
if err != nil {
// The only error can be a bad pattern.
return fmt.Errorf("error retrieving rule files for %s: %s", pat, err)
}
files = append(files, fs...)
}
return ruleManager.Update(time.Duration(cfg.GlobalConfig.EvaluationInterval), files)
},
zap.S().Error("couldn't load configuration (--config.file=%q): %v", filename, err)
}
// ruleManager.Run()
// defer ruleManager.Stop()
// err = discoveryManagerNotify.Run()
// if err != nil {
// zap.S().Error("Error in discoveryManagerNotify.Run()")
// }
// sync.Once is used to make sure we can close the channel at different execution stages(SIGTERM or when the config is loaded).
type closeOnce struct {
C chan struct{}
once sync.Once
Close func()
}
// Wait until the server is ready to handle reloading.
reloadReady := &closeOnce{
C: make(chan struct{}),
}
reloadReady.Close = func() {
reloadReady.once.Do(func() {
close(reloadReady.C)
})
}
var g group.Group
{
// Notify discovery manager.
g.Add(
func() error {
err := discoveryManagerNotify.Run()
level.Info(logger).Log("msg", "Notify discovery manager stopped")
return err
},
func(err error) {
level.Info(logger).Log("msg", "Stopping notify discovery manager...")
cancelNotify()
},
)
}
{
// Initial configuration loading.
cancel := make(chan struct{})
g.Add(
func() error {
// select {
// case <-dbOpen:
// break
// // In case a shutdown is initiated before the dbOpen is released
// case <-cancel:
// reloadReady.Close()
// return nil
// }
if err := reloadConfig(cfg.configFile, logger, reloaders...); err != nil {
return fmt.Errorf("error loading config from %q: %s", cfg.configFile, err)
}
reloadReady.Close()
<-cancel
return nil
},
func(err error) {
close(cancel)
},
)
}
{
// Rule manager.
// TODO(krasi) refactor ruleManager.Run() to be blocking to avoid using an extra blocking channel.
cancel := make(chan struct{})
g.Add(
func() error {
<-reloadReady.C
ruleManager.Run()
<-cancel
return nil
},
func(err error) {
ruleManager.Stop()
close(cancel)
},
)
}
{
// Notifier.
// Calling notifier.Stop() before ruleManager.Stop() will cause a panic if the ruleManager isn't running,
// so keep this interrupt after the ruleManager.Stop().
g.Add(
func() error {
// When the notifier manager receives a new targets list
// it needs to read a valid config for each job.
// It depends on the config being in sync with the discovery manager
// so we wait until the config is fully loaded.
<-reloadReady.C
notifier.Run(discoveryManagerNotify.SyncCh())
level.Info(logger).Log("msg", "Notifier manager stopped")
return nil
},
func(err error) {
notifier.Stop()
},
)
}
if err := g.Run(); err != nil {
level.Error(logger).Log("err", err)
os.Exit(1)
err = remoteStorage.ApplyConfig(conf)
if err != nil {
zap.S().Error("Error in remoteStorage.ApplyConfig: ", err)
}
// notifier.Run(discoveryManagerNotify.SyncCh())
// defer notifier.Stop()
return &ClickHouseReader{
db: db,
operationsTable: options.primary.OperationsTable,
......@@ -342,92 +117,6 @@ func NewReader() *ClickHouseReader {
}
}
func reloadConfig(filename string, logger log.Logger, rls ...func(*config.Config) error) (err error) {
level.Info(logger).Log("msg", "Loading configuration file", "filename", filename)
conf, err := config.LoadFile(filename)
if err != nil {
return fmt.Errorf("couldn't load configuration (--config.file=%q): %v", filename, err)
}
failed := false
for _, rl := range rls {
if err := rl(conf); err != nil {
level.Error(logger).Log("msg", "Failed to apply configuration", "err", err)
failed = true
}
}
if failed {
return fmt.Errorf("one or more errors occurred while applying the new configuration (--config.file=%q)", filename)
}
level.Info(logger).Log("msg", "Completed loading of configuration file", "filename", filename)
return nil
}
func startsOrEndsWithQuote(s string) bool {
return strings.HasPrefix(s, "\"") || strings.HasPrefix(s, "'") ||
strings.HasSuffix(s, "\"") || strings.HasSuffix(s, "'")
}
// computeExternalURL computes a sanitized external URL from a raw input. It infers unset
// URL parts from the OS and the given listen address.
func computeExternalURL(u, listenAddr string) (*url.URL, error) {
if u == "" {
hostname, err := os.Hostname()
if err != nil {
return nil, err
}
_, port, err := net.SplitHostPort(listenAddr)
if err != nil {
return nil, err
}
u = fmt.Sprintf("http://%s:%s/", hostname, port)
}
if startsOrEndsWithQuote(u) {
return nil, fmt.Errorf("URL must not begin or end with quotes")
}
eu, err := url.Parse(u)
if err != nil {
return nil, err
}
ppref := strings.TrimRight(eu.Path, "/")
if ppref != "" && !strings.HasPrefix(ppref, "/") {
ppref = "/" + ppref
}
eu.Path = ppref
return eu, nil
}
// sendAlerts implements the rules.NotifyFunc for a Notifier.
func sendAlerts(n *notifier.Manager, externalURL string) rules.NotifyFunc {
return func(ctx context.Context, expr string, alerts ...*rules.Alert) {
var res []*notifier.Alert
for _, alert := range alerts {
a := &notifier.Alert{
StartsAt: alert.FiredAt,
Labels: alert.Labels,
Annotations: alert.Annotations,
GeneratorURL: externalURL + strutil.TableLinkForExpression(expr),
}
if !alert.ResolvedAt.IsZero() {
a.EndsAt = alert.ResolvedAt
} else {
a.EndsAt = alert.ValidUntil
}
res = append(res, a)
}
if len(alerts) > 0 {
n.Send(res...)
}
}
}
func initialize(options *Options) (*sqlx.DB, error) {
db, err := connect(options.getPrimary())
......@@ -1167,7 +856,7 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context, ttlParams *model.TTLParam
func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLParams) (*model.GetTTLResponseItem, *model.ApiError) {
parseTTL := func(queryResp string) string {
parseTTL := func(queryResp string) int {
values := strings.Split(queryResp, " ")
N := len(values)
ttlIdx := -1
......@@ -1179,12 +868,17 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa
}
}
if ttlIdx == -1 {
return ""
return ttlIdx
}
output := strings.SplitN(values[ttlIdx], "(", 2)
timePart := strings.Trim(output[1], ")")
return timePart
seconds_int, err := strconv.Atoi(timePart)
if err != nil {
return -1
}
ttl_hrs := seconds_int / 3600
return ttl_hrs
}
getMetricsTTL := func() (*model.DBResponseTTL, *model.ApiError) {
......
......@@ -13,28 +13,16 @@ import (
"go.uber.org/zap"
)
// const (
// ErrorNone ErrorType = ""
// ErrorTimeout ErrorType = "timeout"
// ErrorCanceled ErrorType = "canceled"
// ErrorExec ErrorType = "execution"
// ErrorBadData ErrorType = "bad_data"
// ErrorInternal ErrorType = "internal"
// ErrorUnavailable ErrorType = "unavailable"
// ErrorNotFound ErrorType = "not_found"
// ErrorNotImplemented ErrorType = "not_implemented"
// )
// This time the global variable is unexported.
var db *sqlx.DB
// InitDB sets up setting up the connection pool global variable.
func InitDB(dataSourceName string) error {
func InitDB(dataSourceName string) (*sqlx.DB, error) {
var err error
db, err = sqlx.Open("sqlite3", dataSourceName)
if err != nil {
return err
return nil, err
}
table_schema := `CREATE TABLE IF NOT EXISTS dashboards (
......@@ -47,10 +35,21 @@ func InitDB(dataSourceName string) error {
_, err = db.Exec(table_schema)
if err != nil {
return fmt.Errorf("Error in creating dashboard table: ", err.Error())
return nil, fmt.Errorf("Error in creating dashboard table: ", err.Error())
}
return nil
table_schema = `CREATE TABLE IF NOT EXISTS rules (
id INTEGER PRIMARY KEY AUTOINCREMENT,
updated_at datetime NOT NULL,
data TEXT NOT NULL
);`
_, err = db.Exec(table_schema)
if err != nil {
return nil, fmt.Errorf("Error in creating rules table: ", err.Error())
}
return db, nil
}
type Dashboard struct {
......
......@@ -5,6 +5,7 @@ import (
"fmt"
"os"
"github.com/jmoiron/sqlx"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/util/stats"
"go.signoz.io/query-service/druidQuery"
......@@ -51,7 +52,14 @@ func (druid *DruidReader) GetInstantQueryMetricsResult(ctx context.Context, quer
return nil, nil, &model.ApiError{model.ErrorNotImplemented, fmt.Errorf("Druid does not support metrics")}
}
func (druid *DruidReader) GetRules(localDB *sqlx.DB) (*model.RuleGroups, *model.ApiError) {
return nil, &model.ApiError{model.ErrorNotImplemented, fmt.Errorf("Druid does not support setting rules for alerting")}
}
func (druid *DruidReader) SetRules(localDB *sqlx.DB, alert string) *model.ApiError {
return &model.ApiError{model.ErrorNotImplemented, fmt.Errorf("Druid does not support setting rules for alerting")}
}
func (druid *DruidReader) GetServiceOverview(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceOverviewItem, error) {
return druidQuery.GetServiceOverview(druid.SqlClient, query)
}
......
......@@ -38,7 +38,7 @@ type APIHandler struct {
reader *Reader
pc *posthog.Client
distinctId string
db *sqlx.DB
localDB *sqlx.DB
ready func(http.HandlerFunc) http.HandlerFunc
}
......@@ -52,10 +52,11 @@ func NewAPIHandler(reader *Reader, pc *posthog.Client, distinctId string) (*APIH
}
aH.ready = aH.testReady
err := dashboards.InitDB("/var/lib/signoz/signoz.db")
localDB, err := dashboards.InitDB("./signoz.db")
if err != nil {
return nil, err
}
aH.localDB = localDB
errReadingDashboards := dashboards.LoadDashboardFiles()
if errReadingDashboards != nil {
......@@ -172,7 +173,8 @@ func (aH *APIHandler) respond(w http.ResponseWriter, data interface{}) {
func (aH *APIHandler) RegisterRoutes(router *mux.Router) {
router.HandleFunc("/api/v1/query_range", aH.queryRangeMetrics).Methods(http.MethodGet)
router.HandleFunc("/api/v1/query", aH.queryMetrics).Methods(http.MethodGet)
router.HandleFunc("/api/v1/rules", aH.setRules).Methods(http.MethodPost, http.MethodPut)
router.HandleFunc("/api/v1/rules", aH.getRules).Methods(http.MethodGet)
router.HandleFunc("/api/v1/dashboards", aH.getDashboards).Methods(http.MethodGet)
router.HandleFunc("/api/v1/dashboards", aH.createDashboards).Methods(http.MethodPost)
router.HandleFunc("/api/v1/dashboards/{uuid}", aH.getDashboard).Methods(http.MethodGet)
......@@ -351,6 +353,44 @@ func (aH *APIHandler) createDashboards(w http.ResponseWriter, r *http.Request) {
}
func (aH *APIHandler) getRules(w http.ResponseWriter, r *http.Request) {
rules, err := (*aH.reader).GetRules(aH.localDB)
if err != nil {
aH.respondError(w, err, nil)
return
}
aH.respond(w, rules.Data)
}
func (aH *APIHandler) setRules(w http.ResponseWriter, r *http.Request) {
decoder := json.NewDecoder(r.Body)
type rulesRequest struct {
data string
}
var rules *rulesRequest
err := decoder.Decode(rules)
if err != nil {
aH.respondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, nil)
return
}
apiErrorObj := (*aH.reader).SetRules(aH.localDB, rules.data)
if apiErrorObj.Err != nil {
aH.respondError(w, apiErrorObj, nil)
return
}
aH.respond(w, "rules successfully set")
}
func (aH *APIHandler) queryRangeMetrics(w http.ResponseWriter, r *http.Request) {
query, apiErrorObj := parseQueryRangeRequest(r)
......
......@@ -3,6 +3,7 @@ package app
import (
"context"
"github.com/jmoiron/sqlx"
"github.com/prometheus/prometheus/promql"
"github.com/prometheus/prometheus/util/stats"
"go.signoz.io/query-service/model"
......@@ -10,6 +11,8 @@ import (
type Reader interface {
// Getter Interfaces
GetRules(localDB *sqlx.DB) (*model.RuleGroups, *model.ApiError)
SetRules(localDB *sqlx.DB, alert string) *model.ApiError
GetInstantQueryMetricsResult(ctx context.Context, query *model.InstantQueryMetricsParams) (*promql.Result, *stats.QueryStats, *model.ApiError)
GetQueryRangeResult(ctx context.Context, query *model.QueryRangeParams) (*promql.Result, *stats.QueryStats, *model.ApiError)
GetServiceOverview(ctx context.Context, query *model.GetServiceOverviewParams) (*[]model.ServiceOverviewItem, error)
......
......@@ -567,6 +567,10 @@ func parseTimestamp(param string, r *http.Request) (*string, error) {
return &timeStr, nil
}
func parseSetRulesRequest(r *http.Request) (string, *model.ApiError) {
return "", nil
}
func parseDuration(r *http.Request) (*model.TTLParams, error) {
......
......@@ -8,4 +8,4 @@ groups:
severity: warning
annotations:
summary: High CPU load
description: "CPU load is > 0.1\n VALUE = {{ $value }}\n LABELS = {{ $labels }}"
description: "CPU load is > 0.1\n VALUE = {{ $value }}\n LABELS = {{ $labels }}"
\ No newline at end of file
......@@ -34,6 +34,12 @@ type QueryData struct {
Stats *stats.QueryStats `json:"stats,omitempty"`
}
type RuleGroups struct {
Id int `json:"id" db:"id"`
UpdatedAt time.Time `json:"created_at" db:"created_at"`
Data string `json:"data" db:"data"`
}
type ServiceItem struct {
ServiceName string `json:"serviceName" db:"serviceName"`
Percentile99 float32 `json:"p99" db:"p99"`
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment