From a7ceb9d5e73a84265930485fc34fc3c0aa37786b Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Thu, 12 Sep 2024 08:46:09 +0530 Subject: [PATCH 1/2] chore: move channel management from CH reader to rule DB --- .../app/clickhouseReader/reader.go | 238 +--------------- pkg/query-service/app/http_handler.go | 12 +- .../integrations/alertManager/manager.go | 76 +++-- .../integrations/alertManager/notifier.go | 2 +- pkg/query-service/interfaces/interface.go | 7 - pkg/query-service/model/response.go | 1 + pkg/query-service/rules/db.go | 260 +++++++++++++++++- pkg/query-service/rules/manager.go | 7 +- pkg/query-service/telemetry/telemetry.go | 89 +++--- 9 files changed, 357 insertions(+), 335 deletions(-) diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 2984fa0fa5..44a3118c6b 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -164,7 +164,7 @@ func NewReaderFromClickhouseConnection( featureFlag interfaces.FeatureLookup, cluster string, ) *ClickHouseReader { - alertManager, err := am.New("") + alertManager, err := am.New() if err != nil { zap.L().Error("failed to initialize alert manager", zap.Error(err)) zap.L().Error("check if the alert manager URL is correctly set and valid") @@ -414,242 +414,6 @@ func (r *ClickHouseReader) LoadChannel(channel *model.ChannelItem) *model.ApiErr return nil } -func (r *ClickHouseReader) GetChannel(id string) (*model.ChannelItem, *model.ApiError) { - - idInt, _ := strconv.Atoi(id) - channel := model.ChannelItem{} - - query := "SELECT id, created_at, updated_at, name, type, data data FROM notification_channels WHERE id=? " - - stmt, err := r.localDB.Preparex(query) - - if err != nil { - zap.L().Error("Error in preparing sql query for GetChannel", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err} - } - - err = stmt.Get(&channel, idInt) - - if err != nil { - zap.L().Error("Error in getting channel with id", zap.Int("id", idInt), zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err} - } - - return &channel, nil - -} - -func (r *ClickHouseReader) DeleteChannel(id string) *model.ApiError { - - idInt, _ := strconv.Atoi(id) - - channelToDelete, apiErrorObj := r.GetChannel(id) - - if apiErrorObj != nil { - return apiErrorObj - } - - tx, err := r.localDB.Begin() - if err != nil { - return &model.ApiError{Typ: model.ErrorInternal, Err: err} - } - - { - stmt, err := tx.Prepare(`DELETE FROM notification_channels WHERE id=$1;`) - if err != nil { - zap.L().Error("Error in preparing statement for INSERT to notification_channels", zap.Error(err)) - tx.Rollback() - return &model.ApiError{Typ: model.ErrorInternal, Err: err} - } - defer stmt.Close() - - if _, err := stmt.Exec(idInt); err != nil { - zap.L().Error("Error in Executing prepared statement for INSERT to notification_channels", zap.Error(err)) - tx.Rollback() // return an error too, we may want to wrap them - return &model.ApiError{Typ: model.ErrorInternal, Err: err} - } - } - - apiError := r.alertManager.DeleteRoute(channelToDelete.Name) - if apiError != nil { - tx.Rollback() - return apiError - } - - err = tx.Commit() - if err != nil { - zap.L().Error("Error in committing transaction for DELETE command to notification_channels", zap.Error(err)) - return &model.ApiError{Typ: model.ErrorInternal, Err: err} - } - - return nil - -} - -func (r *ClickHouseReader) GetChannels() (*[]model.ChannelItem, *model.ApiError) { - - channels := []model.ChannelItem{} - - query := "SELECT id, created_at, updated_at, name, type, data data FROM notification_channels" - - err := r.localDB.Select(&channels, query) - - zap.L().Info(query) - - if err != nil { - zap.L().Error("Error in processing sql query", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err} - } - - return &channels, nil - -} - -func getChannelType(receiver *am.Receiver) string { - - if receiver.EmailConfigs != nil { - return "email" - } - if receiver.OpsGenieConfigs != nil { - return "opsgenie" - } - if receiver.PagerdutyConfigs != nil { - return "pagerduty" - } - if receiver.PushoverConfigs != nil { - return "pushover" - } - if receiver.SNSConfigs != nil { - return "sns" - } - if receiver.SlackConfigs != nil { - return "slack" - } - if receiver.VictorOpsConfigs != nil { - return "victorops" - } - if receiver.WebhookConfigs != nil { - return "webhook" - } - if receiver.WechatConfigs != nil { - return "wechat" - } - if receiver.MSTeamsConfigs != nil { - return "msteams" - } - return "" -} - -func (r *ClickHouseReader) EditChannel(receiver *am.Receiver, id string) (*am.Receiver, *model.ApiError) { - - idInt, _ := strconv.Atoi(id) - - channel, apiErrObj := r.GetChannel(id) - - if apiErrObj != nil { - return nil, apiErrObj - } - if channel.Name != receiver.Name { - return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("channel name cannot be changed")} - } - - tx, err := r.localDB.Begin() - if err != nil { - return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err} - } - - channel_type := getChannelType(receiver) - - // check if channel type is supported in the current user plan - if err := r.featureFlags.CheckFeature(fmt.Sprintf("ALERT_CHANNEL_%s", strings.ToUpper(channel_type))); err != nil { - zap.L().Warn("an unsupported feature was blocked", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("unsupported feature. please upgrade your plan to access this feature")} - } - - receiverString, _ := json.Marshal(receiver) - - { - stmt, err := tx.Prepare(`UPDATE notification_channels SET updated_at=$1, type=$2, data=$3 WHERE id=$4;`) - - if err != nil { - zap.L().Error("Error in preparing statement for UPDATE to notification_channels", zap.Error(err)) - tx.Rollback() - return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err} - } - defer stmt.Close() - - if _, err := stmt.Exec(time.Now(), channel_type, string(receiverString), idInt); err != nil { - zap.L().Error("Error in Executing prepared statement for UPDATE to notification_channels", zap.Error(err)) - tx.Rollback() // return an error too, we may want to wrap them - return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err} - } - } - - apiError := r.alertManager.EditRoute(receiver) - if apiError != nil { - tx.Rollback() - return nil, apiError - } - - err = tx.Commit() - if err != nil { - zap.L().Error("Error in committing transaction for INSERT to notification_channels", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err} - } - - return receiver, nil - -} - -func (r *ClickHouseReader) CreateChannel(receiver *am.Receiver) (*am.Receiver, *model.ApiError) { - - channel_type := getChannelType(receiver) - - // check if channel type is supported in the current user plan - if err := r.featureFlags.CheckFeature(fmt.Sprintf("ALERT_CHANNEL_%s", strings.ToUpper(channel_type))); err != nil { - zap.L().Warn("an unsupported feature was blocked", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("unsupported feature. please upgrade your plan to access this feature")} - } - - receiverString, _ := json.Marshal(receiver) - - tx, err := r.localDB.Begin() - if err != nil { - return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err} - } - - { - stmt, err := tx.Prepare(`INSERT INTO notification_channels (created_at, updated_at, name, type, data) VALUES($1,$2,$3,$4,$5);`) - if err != nil { - zap.L().Error("Error in preparing statement for INSERT to notification_channels", zap.Error(err)) - tx.Rollback() - return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err} - } - defer stmt.Close() - - if _, err := stmt.Exec(time.Now(), time.Now(), receiver.Name, channel_type, string(receiverString)); err != nil { - zap.L().Error("Error in Executing prepared statement for INSERT to notification_channels", zap.Error(err)) - tx.Rollback() // return an error too, we may want to wrap them - return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err} - } - } - - apiError := r.alertManager.AddRoute(receiver) - if apiError != nil { - tx.Rollback() - return nil, apiError - } - - err = tx.Commit() - if err != nil { - zap.L().Error("Error in committing transaction for INSERT to notification_channels", zap.Error(err)) - return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err} - } - - return receiver, nil - -} - func (r *ClickHouseReader) GetInstantQueryMetricsResult(ctx context.Context, queryParams *model.InstantQueryMetricsParams) (*promql.Result, *stats.QueryStats, *model.ApiError) { qry, err := r.queryEngine.NewInstantQuery(ctx, r.remoteStorage, nil, queryParams.Query, queryParams.Time) if err != nil { diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 957ea5aaff..51d459d1a3 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -145,7 +145,7 @@ type APIHandlerOpts struct { // NewAPIHandler returns an APIHandler func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { - alertManager, err := am.New("") + alertManager, err := am.New() if err != nil { return nil, err } @@ -1210,7 +1210,7 @@ func (aH *APIHandler) editRule(w http.ResponseWriter, r *http.Request) { func (aH *APIHandler) getChannel(w http.ResponseWriter, r *http.Request) { id := mux.Vars(r)["id"] - channel, apiErrorObj := aH.reader.GetChannel(id) + channel, apiErrorObj := aH.ruleManager.RuleDB().GetChannel(id) if apiErrorObj != nil { RespondError(w, apiErrorObj, nil) return @@ -1220,7 +1220,7 @@ func (aH *APIHandler) getChannel(w http.ResponseWriter, r *http.Request) { func (aH *APIHandler) deleteChannel(w http.ResponseWriter, r *http.Request) { id := mux.Vars(r)["id"] - apiErrorObj := aH.reader.DeleteChannel(id) + apiErrorObj := aH.ruleManager.RuleDB().DeleteChannel(id) if apiErrorObj != nil { RespondError(w, apiErrorObj, nil) return @@ -1229,7 +1229,7 @@ func (aH *APIHandler) deleteChannel(w http.ResponseWriter, r *http.Request) { } func (aH *APIHandler) listChannels(w http.ResponseWriter, r *http.Request) { - channels, apiErrorObj := aH.reader.GetChannels() + channels, apiErrorObj := aH.ruleManager.RuleDB().GetChannels() if apiErrorObj != nil { RespondError(w, apiErrorObj, nil) return @@ -1282,7 +1282,7 @@ func (aH *APIHandler) editChannel(w http.ResponseWriter, r *http.Request) { return } - _, apiErrorObj := aH.reader.EditChannel(receiver, id) + _, apiErrorObj := aH.ruleManager.RuleDB().EditChannel(receiver, id) if apiErrorObj != nil { RespondError(w, apiErrorObj, nil) @@ -1310,7 +1310,7 @@ func (aH *APIHandler) createChannel(w http.ResponseWriter, r *http.Request) { return } - _, apiErrorObj := aH.reader.CreateChannel(receiver) + _, apiErrorObj := aH.ruleManager.RuleDB().CreateChannel(receiver) if apiErrorObj != nil { RespondError(w, apiErrorObj, nil) diff --git a/pkg/query-service/integrations/alertManager/manager.go b/pkg/query-service/integrations/alertManager/manager.go index d80893010e..10db4debd7 100644 --- a/pkg/query-service/integrations/alertManager/manager.go +++ b/pkg/query-service/integrations/alertManager/manager.go @@ -24,42 +24,62 @@ type Manager interface { TestReceiver(receiver *Receiver) *model.ApiError } -func New(url string) (Manager, error) { - - if url == "" { - url = constants.GetAlertManagerApiPrefix() +func defaultOptions() []ManagerOptions { + return []ManagerOptions{ + WithURL(constants.GetAlertManagerApiPrefix()), + WithChannelApiPath(constants.AmChannelApiPath), } +} - urlParsed, err := neturl.Parse(url) - if err != nil { - return nil, err +type ManagerOptions func(m *manager) error + +func New(opts ...ManagerOptions) (Manager, error) { + m := &manager{} + + newOpts := defaultOptions() + newOpts = append(newOpts, opts...) + + for _, opt := range newOpts { + err := opt(m) + if err != nil { + return nil, err + } } - return &manager{ - url: url, - parsedURL: urlParsed, - }, nil + return m, nil } -type manager struct { - url string - parsedURL *neturl.URL +func WithURL(url string) ManagerOptions { + return func(m *manager) error { + m.url = url + parsedURL, err := neturl.Parse(url) + if err != nil { + return err + } + m.parsedURL = parsedURL + return nil + } } -func prepareAmChannelApiURL() string { - basePath := constants.GetAlertManagerApiPrefix() - AmChannelApiPath := constants.AmChannelApiPath - - if len(AmChannelApiPath) > 0 && rune(AmChannelApiPath[0]) == rune('/') { - AmChannelApiPath = AmChannelApiPath[1:] +func WithChannelApiPath(path string) ManagerOptions { + return func(m *manager) error { + m.channelApiPath = path + return nil } +} + +type manager struct { + url string + parsedURL *neturl.URL + channelApiPath string +} - return fmt.Sprintf("%s%s", basePath, AmChannelApiPath) +func (m *manager) prepareAmChannelApiURL() string { + return fmt.Sprintf("%s%s", m.url, m.channelApiPath) } -func prepareTestApiURL() string { - basePath := constants.GetAlertManagerApiPrefix() - return fmt.Sprintf("%s%s", basePath, "v1/testReceiver") +func (m *manager) prepareTestApiURL() string { + return fmt.Sprintf("%s%s", m.url, "v1/testReceiver") } func (m *manager) URL() *neturl.URL { @@ -79,7 +99,7 @@ func (m *manager) AddRoute(receiver *Receiver) *model.ApiError { receiverString, _ := json.Marshal(receiver) - amURL := prepareAmChannelApiURL() + amURL := m.prepareAmChannelApiURL() response, err := http.Post(amURL, contentType, bytes.NewBuffer(receiverString)) if err != nil { @@ -97,7 +117,7 @@ func (m *manager) AddRoute(receiver *Receiver) *model.ApiError { func (m *manager) EditRoute(receiver *Receiver) *model.ApiError { receiverString, _ := json.Marshal(receiver) - amURL := prepareAmChannelApiURL() + amURL := m.prepareAmChannelApiURL() req, err := http.NewRequest(http.MethodPut, amURL, bytes.NewBuffer(receiverString)) if err != nil { @@ -126,7 +146,7 @@ func (m *manager) DeleteRoute(name string) *model.ApiError { values := map[string]string{"name": name} requestData, _ := json.Marshal(values) - amURL := prepareAmChannelApiURL() + amURL := m.prepareAmChannelApiURL() req, err := http.NewRequest(http.MethodDelete, amURL, bytes.NewBuffer(requestData)) if err != nil { @@ -156,7 +176,7 @@ func (m *manager) TestReceiver(receiver *Receiver) *model.ApiError { receiverBytes, _ := json.Marshal(receiver) - amTestURL := prepareTestApiURL() + amTestURL := m.prepareTestApiURL() response, err := http.Post(amTestURL, contentType, bytes.NewBuffer(receiverBytes)) if err != nil { diff --git a/pkg/query-service/integrations/alertManager/notifier.go b/pkg/query-service/integrations/alertManager/notifier.go index e29879f10a..434e2bc112 100644 --- a/pkg/query-service/integrations/alertManager/notifier.go +++ b/pkg/query-service/integrations/alertManager/notifier.go @@ -295,7 +295,7 @@ func newAlertmanagerSet(urls []string, timeout time.Duration, logger log.Logger) ams := []Manager{} for _, u := range urls { - am, err := New(u) + am, err := New(WithURL(u)) if err != nil { level.Error(s.logger).Log(fmt.Sprintf("invalid alert manager url %s: %s", u, err)) } else { diff --git a/pkg/query-service/interfaces/interface.go b/pkg/query-service/interfaces/interface.go index cfb4f9159e..a28a3f12a1 100644 --- a/pkg/query-service/interfaces/interface.go +++ b/pkg/query-service/interfaces/interface.go @@ -8,18 +8,11 @@ import ( "github.com/prometheus/prometheus/promql" "github.com/prometheus/prometheus/storage" "github.com/prometheus/prometheus/util/stats" - am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager" "go.signoz.io/signoz/pkg/query-service/model" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" ) type Reader interface { - GetChannel(id string) (*model.ChannelItem, *model.ApiError) - GetChannels() (*[]model.ChannelItem, *model.ApiError) - DeleteChannel(id string) *model.ApiError - CreateChannel(receiver *am.Receiver) (*am.Receiver, *model.ApiError) - EditChannel(receiver *am.Receiver, id string) (*am.Receiver, *model.ApiError) - GetInstantQueryMetricsResult(ctx context.Context, query *model.InstantQueryMetricsParams) (*promql.Result, *stats.QueryStats, *model.ApiError) GetQueryRangeResult(ctx context.Context, query *model.QueryRangeParams) (*promql.Result, *stats.QueryStats, *model.ApiError) GetServiceOverview(ctx context.Context, query *model.GetServiceOverviewParams, skipConfig *model.SkipConfig) (*[]model.ServiceOverviewItem, *model.ApiError) diff --git a/pkg/query-service/model/response.go b/pkg/query-service/model/response.go index 83df872175..3ec108be3e 100644 --- a/pkg/query-service/model/response.go +++ b/pkg/query-service/model/response.go @@ -638,6 +638,7 @@ type AlertsInfo struct { LogsBasedAlerts int `json:"logsBasedAlerts"` MetricBasedAlerts int `json:"metricBasedAlerts"` TracesBasedAlerts int `json:"tracesBasedAlerts"` + TotalChannels int `json:"totalChannels"` SlackChannels int `json:"slackChannels"` WebHookChannels int `json:"webHookChannels"` PagerDutyChannels int `json:"pagerDutyChannels"` diff --git a/pkg/query-service/rules/db.go b/pkg/query-service/rules/db.go index d9a9be195c..08283412e7 100644 --- a/pkg/query-service/rules/db.go +++ b/pkg/query-service/rules/db.go @@ -11,6 +11,7 @@ import ( "github.com/jmoiron/sqlx" "go.signoz.io/signoz/pkg/query-service/auth" "go.signoz.io/signoz/pkg/query-service/common" + am "go.signoz.io/signoz/pkg/query-service/integrations/alertManager" "go.signoz.io/signoz/pkg/query-service/model" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" "go.uber.org/zap" @@ -18,6 +19,12 @@ import ( // Data store to capture user alert rule settings type RuleDB interface { + GetChannel(id string) (*model.ChannelItem, *model.ApiError) + GetChannels() (*[]model.ChannelItem, *model.ApiError) + DeleteChannel(id string) *model.ApiError + CreateChannel(receiver *am.Receiver) (*am.Receiver, *model.ApiError) + EditChannel(receiver *am.Receiver, id string) (*am.Receiver, *model.ApiError) + // CreateRuleTx stores rule in the db and returns tx and group name (on success) CreateRuleTx(ctx context.Context, rule string) (int64, Tx, error) @@ -68,13 +75,15 @@ type Tx interface { type ruleDB struct { *sqlx.DB + alertManager am.Manager } // todo: move init methods for creating tables -func NewRuleDB(db *sqlx.DB) RuleDB { +func NewRuleDB(db *sqlx.DB, alertManager am.Manager) RuleDB { return &ruleDB{ db, + alertManager, } } @@ -303,6 +312,229 @@ func (r *ruleDB) EditPlannedMaintenance(ctx context.Context, maintenance Planned return "", nil } +func getChannelType(receiver *am.Receiver) string { + + if receiver.EmailConfigs != nil { + return "email" + } + if receiver.OpsGenieConfigs != nil { + return "opsgenie" + } + if receiver.PagerdutyConfigs != nil { + return "pagerduty" + } + if receiver.PushoverConfigs != nil { + return "pushover" + } + if receiver.SNSConfigs != nil { + return "sns" + } + if receiver.SlackConfigs != nil { + return "slack" + } + if receiver.VictorOpsConfigs != nil { + return "victorops" + } + if receiver.WebhookConfigs != nil { + return "webhook" + } + if receiver.WechatConfigs != nil { + return "wechat" + } + if receiver.MSTeamsConfigs != nil { + return "msteams" + } + return "" +} + +func (r *ruleDB) GetChannel(id string) (*model.ChannelItem, *model.ApiError) { + + idInt, _ := strconv.Atoi(id) + channel := model.ChannelItem{} + + query := "SELECT id, created_at, updated_at, name, type, data data FROM notification_channels WHERE id=?;" + + stmt, err := r.Preparex(query) + + if err != nil { + zap.L().Error("Error in preparing sql query for GetChannel", zap.Error(err)) + return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err} + } + + err = stmt.Get(&channel, idInt) + + if err != nil { + zap.L().Error("Error in getting channel with id", zap.Int("id", idInt), zap.Error(err)) + return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err} + } + + return &channel, nil +} + +func (r *ruleDB) DeleteChannel(id string) *model.ApiError { + + idInt, _ := strconv.Atoi(id) + + channelToDelete, apiErrorObj := r.GetChannel(id) + + if apiErrorObj != nil { + return apiErrorObj + } + + tx, err := r.Begin() + if err != nil { + return &model.ApiError{Typ: model.ErrorInternal, Err: err} + } + + { + stmt, err := tx.Prepare(`DELETE FROM notification_channels WHERE id=$1;`) + if err != nil { + zap.L().Error("Error in preparing statement for INSERT to notification_channels", zap.Error(err)) + tx.Rollback() + return &model.ApiError{Typ: model.ErrorInternal, Err: err} + } + defer stmt.Close() + + if _, err := stmt.Exec(idInt); err != nil { + zap.L().Error("Error in Executing prepared statement for INSERT to notification_channels", zap.Error(err)) + tx.Rollback() // return an error too, we may want to wrap them + return &model.ApiError{Typ: model.ErrorInternal, Err: err} + } + } + + apiError := r.alertManager.DeleteRoute(channelToDelete.Name) + if apiError != nil { + tx.Rollback() + return apiError + } + + err = tx.Commit() + if err != nil { + zap.L().Error("Error in committing transaction for DELETE command to notification_channels", zap.Error(err)) + return &model.ApiError{Typ: model.ErrorInternal, Err: err} + } + + return nil + +} + +func (r *ruleDB) GetChannels() (*[]model.ChannelItem, *model.ApiError) { + + channels := []model.ChannelItem{} + + query := "SELECT id, created_at, updated_at, name, type, data data FROM notification_channels" + + err := r.Select(&channels, query) + + zap.L().Info(query) + + if err != nil { + zap.L().Error("Error in processing sql query", zap.Error(err)) + return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err} + } + + return &channels, nil + +} + +func (r *ruleDB) EditChannel(receiver *am.Receiver, id string) (*am.Receiver, *model.ApiError) { + + idInt, _ := strconv.Atoi(id) + + channel, apiErrObj := r.GetChannel(id) + + if apiErrObj != nil { + return nil, apiErrObj + } + if channel.Name != receiver.Name { + return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("channel name cannot be changed")} + } + + tx, err := r.Begin() + if err != nil { + return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err} + } + + channel_type := getChannelType(receiver) + + receiverString, _ := json.Marshal(receiver) + + { + stmt, err := tx.Prepare(`UPDATE notification_channels SET updated_at=$1, type=$2, data=$3 WHERE id=$4;`) + + if err != nil { + zap.L().Error("Error in preparing statement for UPDATE to notification_channels", zap.Error(err)) + tx.Rollback() + return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err} + } + defer stmt.Close() + + if _, err := stmt.Exec(time.Now(), channel_type, string(receiverString), idInt); err != nil { + zap.L().Error("Error in Executing prepared statement for UPDATE to notification_channels", zap.Error(err)) + tx.Rollback() // return an error too, we may want to wrap them + return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err} + } + } + + apiError := r.alertManager.EditRoute(receiver) + if apiError != nil { + tx.Rollback() + return nil, apiError + } + + err = tx.Commit() + if err != nil { + zap.L().Error("Error in committing transaction for INSERT to notification_channels", zap.Error(err)) + return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err} + } + + return receiver, nil + +} + +func (r *ruleDB) CreateChannel(receiver *am.Receiver) (*am.Receiver, *model.ApiError) { + + channel_type := getChannelType(receiver) + + receiverString, _ := json.Marshal(receiver) + + tx, err := r.Begin() + if err != nil { + return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err} + } + + { + stmt, err := tx.Prepare(`INSERT INTO notification_channels (created_at, updated_at, name, type, data) VALUES($1,$2,$3,$4,$5);`) + if err != nil { + zap.L().Error("Error in preparing statement for INSERT to notification_channels", zap.Error(err)) + tx.Rollback() + return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err} + } + defer stmt.Close() + + if _, err := stmt.Exec(time.Now(), time.Now(), receiver.Name, channel_type, string(receiverString)); err != nil { + zap.L().Error("Error in Executing prepared statement for INSERT to notification_channels", zap.Error(err)) + tx.Rollback() // return an error too, we may want to wrap them + return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err} + } + } + + apiError := r.alertManager.AddRoute(receiver) + if apiError != nil { + tx.Rollback() + return nil, apiError + } + + err = tx.Commit() + if err != nil { + zap.L().Error("Error in committing transaction for INSERT to notification_channels", zap.Error(err)) + return nil, &model.ApiError{Typ: model.ErrorInternal, Err: err} + } + + return receiver, nil + +} + func (r *ruleDB) GetAlertsInfo(ctx context.Context) (*model.AlertsInfo, error) { alertsInfo := model.AlertsInfo{} // fetch alerts from rules db @@ -349,5 +581,31 @@ func (r *ruleDB) GetAlertsInfo(ctx context.Context) (*model.AlertsInfo, error) { alertsInfo.TotalAlerts = alertsInfo.TotalAlerts + 1 } alertsInfo.AlertNames = alertNames + + channels, _ := r.GetChannels() + if channels != nil { + alertsInfo.TotalChannels = len(*channels) + for _, channel := range *channels { + if channel.Type == "slack" { + alertsInfo.SlackChannels = alertsInfo.SlackChannels + 1 + } + if channel.Type == "webhook" { + alertsInfo.WebHookChannels = alertsInfo.WebHookChannels + 1 + } + if channel.Type == "email" { + alertsInfo.EmailChannels = alertsInfo.EmailChannels + 1 + } + if channel.Type == "pagerduty" { + alertsInfo.PagerDutyChannels = alertsInfo.PagerDutyChannels + 1 + } + if channel.Type == "opsgenie" { + alertsInfo.OpsGenieChannels = alertsInfo.OpsGenieChannels + 1 + } + if channel.Type == "msteams" { + alertsInfo.MSTeamsChannels = alertsInfo.MSTeamsChannels + 1 + } + } + } + return &alertsInfo, nil } diff --git a/pkg/query-service/rules/manager.go b/pkg/query-service/rules/manager.go index 120d674a9a..04cb38ba13 100644 --- a/pkg/query-service/rules/manager.go +++ b/pkg/query-service/rules/manager.go @@ -183,7 +183,12 @@ func NewManager(o *ManagerOptions) (*Manager, error) { return nil, err } - db := NewRuleDB(o.DBConn) + amManager, err := am.New() + if err != nil { + return nil, err + } + + db := NewRuleDB(o.DBConn, amManager) telemetry.GetInstance().SetAlertsInfoCallback(db.GetAlertsInfo) diff --git a/pkg/query-service/telemetry/telemetry.go b/pkg/query-service/telemetry/telemetry.go index 7f282ea3f9..bba904be13 100644 --- a/pkg/query-service/telemetry/telemetry.go +++ b/pkg/query-service/telemetry/telemetry.go @@ -310,63 +310,44 @@ func createTelemetry() { if err == nil { dashboardsInfo, err := telemetry.reader.GetDashboardsInfo(ctx) if err == nil { - channels, err := telemetry.reader.GetChannels() + savedViewsInfo, err := telemetry.reader.GetSavedViewsInfo(ctx) if err == nil { - for _, channel := range *channels { - switch channel.Type { - case "slack": - alertsInfo.SlackChannels++ - case "webhook": - alertsInfo.WebHookChannels++ - case "pagerduty": - alertsInfo.PagerDutyChannels++ - case "opsgenie": - alertsInfo.OpsGenieChannels++ - case "email": - alertsInfo.EmailChannels++ - case "msteams": - alertsInfo.MSTeamsChannels++ - } + dashboardsAlertsData := map[string]interface{}{ + "totalDashboards": dashboardsInfo.TotalDashboards, + "totalDashboardsWithPanelAndName": dashboardsInfo.TotalDashboardsWithPanelAndName, + "dashboardNames": dashboardsInfo.DashboardNames, + "alertNames": alertsInfo.AlertNames, + "logsBasedPanels": dashboardsInfo.LogsBasedPanels, + "metricBasedPanels": dashboardsInfo.MetricBasedPanels, + "tracesBasedPanels": dashboardsInfo.TracesBasedPanels, + "dashboardsWithTSV2": dashboardsInfo.QueriesWithTSV2, + "totalAlerts": alertsInfo.TotalAlerts, + "alertsWithTSV2": alertsInfo.AlertsWithTSV2, + "logsBasedAlerts": alertsInfo.LogsBasedAlerts, + "metricBasedAlerts": alertsInfo.MetricBasedAlerts, + "tracesBasedAlerts": alertsInfo.TracesBasedAlerts, + "totalChannels": alertsInfo.TotalChannels, + "totalSavedViews": savedViewsInfo.TotalSavedViews, + "logsSavedViews": savedViewsInfo.LogsSavedViews, + "tracesSavedViews": savedViewsInfo.TracesSavedViews, + "slackChannels": alertsInfo.SlackChannels, + "webHookChannels": alertsInfo.WebHookChannels, + "pagerDutyChannels": alertsInfo.PagerDutyChannels, + "opsGenieChannels": alertsInfo.OpsGenieChannels, + "emailChannels": alertsInfo.EmailChannels, + "msteamsChannels": alertsInfo.MSTeamsChannels, + "metricsBuilderQueries": alertsInfo.MetricsBuilderQueries, + "metricsClickHouseQueries": alertsInfo.MetricsClickHouseQueries, + "metricsPrometheusQueries": alertsInfo.MetricsPrometheusQueries, + "spanMetricsPrometheusQueries": alertsInfo.SpanMetricsPrometheusQueries, } - savedViewsInfo, err := telemetry.reader.GetSavedViewsInfo(ctx) - if err == nil { - dashboardsAlertsData := map[string]interface{}{ - "totalDashboards": dashboardsInfo.TotalDashboards, - "totalDashboardsWithPanelAndName": dashboardsInfo.TotalDashboardsWithPanelAndName, - "dashboardNames": dashboardsInfo.DashboardNames, - "alertNames": alertsInfo.AlertNames, - "logsBasedPanels": dashboardsInfo.LogsBasedPanels, - "metricBasedPanels": dashboardsInfo.MetricBasedPanels, - "tracesBasedPanels": dashboardsInfo.TracesBasedPanels, - "dashboardsWithTSV2": dashboardsInfo.QueriesWithTSV2, - "totalAlerts": alertsInfo.TotalAlerts, - "alertsWithTSV2": alertsInfo.AlertsWithTSV2, - "logsBasedAlerts": alertsInfo.LogsBasedAlerts, - "metricBasedAlerts": alertsInfo.MetricBasedAlerts, - "tracesBasedAlerts": alertsInfo.TracesBasedAlerts, - "totalChannels": len(*channels), - "totalSavedViews": savedViewsInfo.TotalSavedViews, - "logsSavedViews": savedViewsInfo.LogsSavedViews, - "tracesSavedViews": savedViewsInfo.TracesSavedViews, - "slackChannels": alertsInfo.SlackChannels, - "webHookChannels": alertsInfo.WebHookChannels, - "pagerDutyChannels": alertsInfo.PagerDutyChannels, - "opsGenieChannels": alertsInfo.OpsGenieChannels, - "emailChannels": alertsInfo.EmailChannels, - "msteamsChannels": alertsInfo.MSTeamsChannels, - "metricsBuilderQueries": alertsInfo.MetricsBuilderQueries, - "metricsClickHouseQueries": alertsInfo.MetricsClickHouseQueries, - "metricsPrometheusQueries": alertsInfo.MetricsPrometheusQueries, - "spanMetricsPrometheusQueries": alertsInfo.SpanMetricsPrometheusQueries, - } - // send event only if there are dashboards or alerts or channels - if (dashboardsInfo.TotalDashboards > 0 || alertsInfo.TotalAlerts > 0 || len(*channels) > 0 || savedViewsInfo.TotalSavedViews > 0) && apiErr == nil { - for _, user := range users { - if user.Email == DEFAULT_CLOUD_EMAIL { - continue - } - telemetry.SendEvent(TELEMETRY_EVENT_DASHBOARDS_ALERTS, dashboardsAlertsData, user.Email, false, false) + // send event only if there are dashboards or alerts or channels + if (dashboardsInfo.TotalDashboards > 0 || alertsInfo.TotalAlerts > 0 || alertsInfo.TotalChannels > 0 || savedViewsInfo.TotalSavedViews > 0) && apiErr == nil { + for _, user := range users { + if user.Email == DEFAULT_CLOUD_EMAIL { + continue } + telemetry.SendEvent(TELEMETRY_EVENT_DASHBOARDS_ALERTS, dashboardsAlertsData, user.Email, false, false) } } } From 60fb32a04499dc24c8eed09cb4246d58197a36c8 Mon Sep 17 00:00:00 2001 From: Srikanth Chekuri Date: Sat, 14 Sep 2024 13:46:47 +0530 Subject: [PATCH 2/2] chore: remove some code --- .../app/clickhouseReader/reader.go | 28 ------------------- pkg/query-service/telemetry/telemetry.go | 6 ++-- 2 files changed, 2 insertions(+), 32 deletions(-) diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index 5a56d256ec..ca634ef3f6 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -1,15 +1,12 @@ package clickhouseReader import ( - "bytes" "context" "database/sql" "encoding/json" "fmt" - "io" "math" "math/rand" - "net/http" "os" "reflect" "regexp" @@ -410,31 +407,6 @@ func (r *ClickHouseReader) GetConn() clickhouse.Conn { return r.db } -func (r *ClickHouseReader) LoadChannel(channel *model.ChannelItem) *model.ApiError { - - receiver := &am.Receiver{} - if err := json.Unmarshal([]byte(channel.Data), receiver); err != nil { // Parse []byte to go struct pointer - return &model.ApiError{Typ: model.ErrorBadData, Err: err} - } - - response, err := http.Post(constants.GetAlertManagerApiPrefix()+"v1/receivers", "application/json", bytes.NewBuffer([]byte(channel.Data))) - - if err != nil { - zap.L().Error("Error in getting response of API call to alertmanager/v1/receivers", zap.Error(err)) - return &model.ApiError{Typ: model.ErrorInternal, Err: err} - } - if response.StatusCode > 299 { - responseData, _ := io.ReadAll(response.Body) - - err := fmt.Errorf("error in getting 2xx response in API call to alertmanager/v1/receivers") - zap.L().Error("Error in getting 2xx response in API call to alertmanager/v1/receivers", zap.String("Status", response.Status), zap.String("Data", string(responseData))) - - return &model.ApiError{Typ: model.ErrorInternal, Err: err} - } - - return nil -} - func (r *ClickHouseReader) GetInstantQueryMetricsResult(ctx context.Context, queryParams *model.InstantQueryMetricsParams) (*promql.Result, *stats.QueryStats, *model.ApiError) { qry, err := r.queryEngine.NewInstantQuery(ctx, r.remoteStorage, nil, queryParams.Query, queryParams.Time) if err != nil { diff --git a/pkg/query-service/telemetry/telemetry.go b/pkg/query-service/telemetry/telemetry.go index 57382e3737..be6ad4719c 100644 --- a/pkg/query-service/telemetry/telemetry.go +++ b/pkg/query-service/telemetry/telemetry.go @@ -448,11 +448,9 @@ func getOutboundIP() string { } defer resp.Body.Close() + ipBody, err := io.ReadAll(resp.Body) if err == nil { - ipBody, err := io.ReadAll(resp.Body) - if err == nil { - ip = ipBody - } + ip = ipBody } return string(ip)