Commit 5262508e authored by makeavish's avatar makeavish
Browse files

fix: :bug: convert TTL APIs to async

No related merge requests found
Showing with 189 additions and 33 deletions
+189 -33
......@@ -2268,46 +2268,113 @@ func (r *ClickHouseReader) GetFilteredSpansAggregates(ctx context.Context, query
func (r *ClickHouseReader) SetTTL(ctx context.Context,
params *model.TTLParams) (*model.SetTTLResponseItem, *model.ApiError) {
_, err := r.localDB.Exec("DELETE FROM ttl_status WHERE created_at <= datetime('now','-7 days')")
if err != nil {
zap.S().Debug("Error in processing ttl_status delete sql query: ", err)
}
var req, tableName string
switch params.Type {
case constants.TraceTTL:
tableNameArray := []string{signozTraceDBName + "." + signozTraceTableName, signozTraceDBName + "." + signozDurationMVTable, signozTraceDBName + "." + signozSpansTable, signozTraceDBName + "." + signozErrorIndexTable}
for _, tableName = range tableNameArray {
statusItem, err := r.checkTTLStatusItem(ctx, tableName)
if err != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing ttl_status check sql query")}
}
if statusItem.Status == constants.StatusPending && statusItem.UpdatedAt.Unix()-time.Now().Unix() < 3600 {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("TTL is already running")}
}
}
for _, tableName := range tableNameArray {
go func(tableName string) {
_, dbErr := r.localDB.Exec("INSERT INTO ttl_status (created_at, updated_at, table_name, ttl, status) VALUES (?, ?, ?, ?, ?)", time.Now(), time.Now(), tableName, params.DelDuration, constants.StatusPending)
if dbErr != nil {
zap.S().Error(fmt.Errorf("Error in inserting to ttl_status table: %s", dbErr.Error()))
return
}
req = fmt.Sprintf(
"ALTER TABLE %v MODIFY TTL toDateTime(timestamp) + INTERVAL %v SECOND DELETE",
tableName, params.DelDuration)
if len(params.ColdStorageVolume) > 0 {
req += fmt.Sprintf(", toDateTime(timestamp) + INTERVAL %v SECOND TO VOLUME '%s'",
params.ToColdStorageDuration, params.ColdStorageVolume)
}
err := r.setColdStorage(context.Background(), tableName, params.ColdStorageVolume)
if err != nil {
zap.S().Error(fmt.Errorf("Error in setting cold storage: %s", err.Err.Error()))
statusItem, err := r.checkTTLStatusItem(ctx, tableName)
if err == nil {
_, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id)
if dbErr != nil {
zap.S().Debug("Error in processing ttl_status update sql query: ", dbErr)
return
}
}
return
}
zap.S().Debugf("Executing TTL request: %s\n", req)
statusItem, _ := r.checkTTLStatusItem(ctx, tableName)
if err := r.db.Exec(context.Background(), req); err != nil {
zap.S().Error(fmt.Errorf("Error in executing set TTL query: %s", err.Error()))
if err == nil {
_, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id)
if dbErr != nil {
zap.S().Debug("Error in processing ttl_status update sql query: ", dbErr)
return
}
}
return
}
_, dbErr = r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusSuccess, statusItem.Id)
if dbErr != nil {
zap.S().Debug("Error in processing ttl_status update sql query: ", dbErr)
return
}
}(tableName)
}
case constants.MetricsTTL:
tableName = signozMetricDBName + "." + signozSampleName
statusItem, err := r.checkTTLStatusItem(ctx, tableName)
if err != nil {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing ttl_status check sql query")}
}
if statusItem.Status == constants.StatusPending && statusItem.UpdatedAt.Unix()-time.Now().Unix() < 3600 {
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("TTL is already running")}
}
go func(tableName string) {
_, dbErr := r.localDB.Exec("INSERT INTO ttl_status (created_at, updated_at, table_name, ttl, status) VALUES (?, ?, ?, ?, ?)", time.Now(), time.Now(), tableName, params.DelDuration, constants.StatusPending)
if dbErr != nil {
zap.S().Error(fmt.Errorf("Error in inserting to ttl_status table: %s", dbErr.Error()))
return
}
req = fmt.Sprintf(
"ALTER TABLE %v MODIFY TTL toDateTime(timestamp) + INTERVAL %v SECOND DELETE",
tableName, params.DelDuration)
"ALTER TABLE %v MODIFY TTL toDateTime(toUInt32(timestamp_ms / 1000), 'UTC') + "+
"INTERVAL %v SECOND DELETE", tableName, params.DelDuration)
if len(params.ColdStorageVolume) > 0 {
req += fmt.Sprintf(", toDateTime(timestamp) + INTERVAL %v SECOND TO VOLUME '%s'",
req += fmt.Sprintf(", toDateTime(toUInt32(timestamp_ms / 1000), 'UTC')"+
" + INTERVAL %v SECOND TO VOLUME '%s'",
params.ToColdStorageDuration, params.ColdStorageVolume)
}
err := r.setColdStorage(ctx, tableName, params.ColdStorageVolume)
if err != nil {
return nil, err
}
zap.S().Debugf("Executing TTL request: %s\n", req)
statusItem, _ := r.checkTTLStatusItem(ctx, tableName)
if err := r.db.Exec(ctx, req); err != nil {
zap.S().Error(fmt.Errorf("error while setting ttl. Err=%v", err))
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while setting ttl. Err=%v", err)}
if err == nil {
_, dbErr := r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusFailed, statusItem.Id)
if dbErr != nil {
zap.S().Debug("Error in processing ttl_status update sql query: ", dbErr)
return
}
}
return
}
}
case constants.MetricsTTL:
tableName = signozMetricDBName + "." + signozSampleName
req = fmt.Sprintf(
"ALTER TABLE %v MODIFY TTL toDateTime(toUInt32(timestamp_ms / 1000), 'UTC') + "+
"INTERVAL %v SECOND DELETE", tableName, params.DelDuration)
if len(params.ColdStorageVolume) > 0 {
req += fmt.Sprintf(", toDateTime(toUInt32(timestamp_ms / 1000), 'UTC')"+
" + INTERVAL %v SECOND TO VOLUME '%s'",
params.ToColdStorageDuration, params.ColdStorageVolume)
}
zap.S().Debugf("Executing TTL request: %s\n", req)
if err := r.db.Exec(ctx, req); err != nil {
zap.S().Error(fmt.Errorf("error while setting ttl. Err=%v", err))
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while setting ttl. Err=%v", err)}
}
_, dbErr = r.localDB.Exec("UPDATE ttl_status SET updated_at = ?, status = ? WHERE id = ?", time.Now(), constants.StatusSuccess, statusItem.Id)
if dbErr != nil {
zap.S().Debug("Error in processing ttl_status update sql query: ", dbErr)
return
}
}(tableName)
default:
return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error while setting ttl. ttl type should be <metrics|traces>, got %v",
......@@ -2317,6 +2384,44 @@ func (r *ClickHouseReader) SetTTL(ctx context.Context,
return &model.SetTTLResponseItem{Message: "move ttl has been successfully set up"}, nil
}
func (r *ClickHouseReader) checkTTLStatusItem(ctx context.Context, tableName string) (model.TTLStatusItem, error) {
statusItem := []model.TTLStatusItem{}
query := fmt.Sprintf("SELECT id, status FROM ttl_status WHERE table_name = '%s' ORDER BY created_at DESC", tableName)
err := r.localDB.Select(&statusItem, query)
zap.S().Info(query)
if err != nil || len(statusItem) == 0 {
zap.S().Debug("Error in processing sql query: ", err)
return model.TTLStatusItem{}, err
}
return statusItem[0], err
}
func (r *ClickHouseReader) setTTLQueryStatus(ctx context.Context, tableNameArray []string) (string, *model.ApiError) {
failFlag := false
status := constants.StatusSuccess
for _, tableName := range tableNameArray {
statusItem, err := r.checkTTLStatusItem(ctx, tableName)
if err != nil {
return "", &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("Error in processing ttl_status check sql query")}
}
if statusItem.Status == constants.StatusPending && statusItem.UpdatedAt.Unix()-time.Now().Unix() < 3600 {
status = constants.StatusPending
return status, nil
}
if statusItem.Status == constants.StatusFailed {
failFlag = true
}
}
if failFlag {
status = constants.StatusFailed
}
return status, nil
}
func (r *ClickHouseReader) setColdStorage(ctx context.Context, tableName string, coldStorageVolume string) *model.ApiError {
// Set the storage policy for the required table. If it is already set, then setting it again
......@@ -2451,29 +2556,52 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa
switch ttlParams.Type {
case constants.TraceTTL:
tableNameArray := []string{signozTraceDBName + "." + signozTraceTableName, signozTraceDBName + "." + signozDurationMVTable, signozTraceDBName + "." + signozSpansTable, signozTraceDBName + "." + signozErrorIndexTable}
status, err := r.setTTLQueryStatus(ctx, tableNameArray)
if err != nil {
return nil, err
}
dbResp, err := getTracesTTL()
if err != nil {
return nil, err
}
delTTL, moveTTL := parseTTL(dbResp.EngineFull)
return &model.GetTTLResponseItem{TracesTime: delTTL, TracesMoveTime: moveTTL}, nil
return &model.GetTTLResponseItem{TracesTime: delTTL, TracesMoveTime: moveTTL, Status: status}, nil
case constants.MetricsTTL:
tableNameArray := []string{signozMetricDBName + "." + signozSampleName}
status, err := r.setTTLQueryStatus(ctx, tableNameArray)
if err != nil {
return nil, err
}
dbResp, err := getMetricsTTL()
if err != nil {
return nil, err
}
delTTL, moveTTL := parseTTL(dbResp.EngineFull)
return &model.GetTTLResponseItem{MetricsTime: delTTL, MetricsMoveTime: moveTTL}, nil
return &model.GetTTLResponseItem{MetricsTime: delTTL, MetricsMoveTime: moveTTL, Status: status}, nil
}
tableNameArray := []string{signozTraceDBName + "." + signozTraceTableName, signozTraceDBName + "." + signozDurationMVTable, signozTraceDBName + "." + signozSpansTable, signozTraceDBName + "." + signozErrorIndexTable}
status, err := r.setTTLQueryStatus(ctx, tableNameArray)
if err != nil {
return nil, err
}
db1, err := getTracesTTL()
if err != nil {
return nil, err
}
if status == constants.StatusSuccess {
tableNameArray = []string{signozMetricDBName + "." + signozSampleName}
status, err = r.setTTLQueryStatus(ctx, tableNameArray)
if err != nil {
return nil, err
}
}
db2, err := getMetricsTTL()
if err != nil {
return nil, err
......@@ -2486,6 +2614,7 @@ func (r *ClickHouseReader) GetTTL(ctx context.Context, ttlParams *model.GetTTLPa
TracesMoveTime: tracesMoveTTL,
MetricsTime: metricsDelTTL,
MetricsMoveTime: metricsMoveTTL,
Status: status,
}, nil
}
......
......@@ -66,6 +66,20 @@ func InitDB(dataSourceName string) (*sqlx.DB, error) {
return nil, fmt.Errorf("Error in creating notification_channles table: %s", err.Error())
}
table_schema = `CREATE TABLE IF NOT EXISTS ttl_status (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at datetime NOT NULL,
updated_at datetime NOT NULL,
table_name TEXT NOT NULL,
ttl INTEGER DEFAULT 0,
status TEXT NOT NULL
);`
_, err = db.Exec(table_schema)
if err != nil {
return nil, fmt.Errorf("Error in creating ttl_status table: %s", err.Error())
}
return db, nil
}
......
......@@ -57,6 +57,9 @@ const (
Descending = "descending"
Ascending = "ascending"
ContextTimeout = 60 // seconds
StatusPending = "pending"
StatusFailed = "failed"
StatusSuccess = "success"
)
func GetOrDefaultEnv(key string, fallback string) string {
......
......@@ -48,6 +48,15 @@ type RuleResponseItem struct {
Data string `json:"data" db:"data"`
}
type TTLStatusItem struct {
Id int `json:"id" db:"id"`
UpdatedAt time.Time `json:"updated_at" db:"updated_at"`
CreatedAt time.Time `json:"created_at" db:"created_at"`
TableName string `json:"table_name" db:"table_name"`
TTL int `json:"ttl" db:"ttl"`
Status string `json:"status" db:"status"`
}
type ChannelItem struct {
Id int `json:"id" db:"id"`
CreatedAt time.Time `json:"created_at" db:"created_at"`
......@@ -260,10 +269,11 @@ type DBResponseTTL struct {
}
type GetTTLResponseItem struct {
MetricsTime int `json:"metrics_ttl_duration_hrs,omitempty"`
MetricsMoveTime int `json:"metrics_move_ttl_duration_hrs,omitempty"`
TracesTime int `json:"traces_ttl_duration_hrs,omitempty"`
TracesMoveTime int `json:"traces_move_ttl_duration_hrs,omitempty"`
MetricsTime int `json:"metrics_ttl_duration_hrs,omitempty"`
MetricsMoveTime int `json:"metrics_move_ttl_duration_hrs,omitempty"`
TracesTime int `json:"traces_ttl_duration_hrs,omitempty"`
TracesMoveTime int `json:"traces_move_ttl_duration_hrs,omitempty"`
Status string `json:"status,omitempty"`
}
type DBResponseServiceName struct {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment