From 86d42e1c5b19b21c3b563294288039a0aafb8c2c Mon Sep 17 00:00:00 2001 From: Aditya Thebe Date: Wed, 3 Jan 2024 12:49:11 +0545 Subject: [PATCH] feat: move check details query to duty --- cmd/root.go | 6 +- go.mod | 6 +- pkg/api.go | 60 +-------------- pkg/api/api.go | 36 +++------ pkg/cache/cache.go | 135 --------------------------------- pkg/cache/postgres_query.go | 144 ------------------------------------ pkg/cache/postgres_util.go | 28 ------- pkg/jobs/canary/status.go | 7 +- pkg/metrics/metrics.go | 22 +++--- 9 files changed, 34 insertions(+), 410 deletions(-) delete mode 100644 pkg/cache/cache.go delete mode 100644 pkg/cache/postgres_query.go diff --git a/cmd/root.go b/cmd/root.go index 4cba422ce..d315ab00f 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -6,7 +6,6 @@ import ( "github.com/flanksource/canary-checker/checks" "github.com/flanksource/canary-checker/pkg" - "github.com/flanksource/canary-checker/pkg/cache" "github.com/flanksource/canary-checker/pkg/db" "github.com/flanksource/canary-checker/pkg/jobs/canary" "github.com/flanksource/canary-checker/pkg/prometheus" @@ -16,6 +15,7 @@ import ( "github.com/flanksource/commons/logger" "github.com/flanksource/duty" "github.com/flanksource/duty/context" + "github.com/flanksource/duty/query" "github.com/spf13/cobra" "github.com/spf13/pflag" "go.opentelemetry.io/otel" @@ -97,12 +97,12 @@ func ServerFlags(flags *pflag.FlagSet) { flags.StringSliceVar(&runner.IncludeCanaries, "include-check", []string{}, "Run matching canaries - useful for debugging") flags.StringSliceVar(&runner.IncludeTypes, "include-type", []string{}, "Check type to disable") flags.StringSliceVar(&runner.IncludeNamespaces, "include-namespace", []string{}, "Check type to disable") - flags.IntVar(&cache.DefaultCacheCount, "maxStatusCheckCount", 5, "Maximum number of past checks in the in memory cache") + flags.IntVar(&query.DefaultCacheCount, "maxStatusCheckCount", 5, "Maximum number of past checks in the in memory cache") flags.StringVar(&runner.RunnerName, "name", "local", "Server name shown in aggregate dashboard") flags.StringVar(&prometheus.PrometheusURL, "prometheus", "", "URL of the prometheus server that is scraping this instance") flags.StringVar(&db.ConnectionString, "db", "DB_URL", "Connection string for the postgres database. Use embedded:///path/to/dir to use the embedded database") flags.IntVar(&db.DefaultExpiryDays, "cache-timeout", 90, "Cache timeout in days") - flags.StringVarP(&cache.DefaultWindow, "default-window", "", "1h", "Default search window") + flags.StringVarP(&query.DefaultCheckQueryWindow, "default-window", "", "1h", "Default search window") flags.IntVar(&db.CheckStatusRetention, "check-status-retention-period", db.CheckStatusRetention, "Check status retention period in days") flags.IntVar(&topology.CheckRetentionDays, "check-retention-period", topology.DefaultCheckRetentionDays, "Check retention period in days") flags.IntVar(&topology.CanaryRetentionDays, "canary-retention-period", topology.DefaultCanaryRetentionDays, "Canary retention period in days") diff --git a/go.mod b/go.mod index 8de91403a..7d3279648 100644 --- a/go.mod +++ b/go.mod @@ -61,7 +61,6 @@ require ( github.com/samber/lo v1.39.0 github.com/sevennt/echo-pprof v0.1.1-0.20220616082843-66a461746b5f github.com/spf13/cobra v1.8.0 - github.com/spf13/pflag v1.0.5 github.com/timberio/go-datemath v0.1.0 go.mongodb.org/mongo-driver v1.12.1 go.opentelemetry.io/otel v1.19.0 @@ -221,6 +220,7 @@ require ( github.com/sergi/go-diff v1.3.1 // indirect github.com/sethvargo/go-retry v0.2.4 // indirect github.com/sirupsen/logrus v1.9.3 // indirect + github.com/spf13/pflag v1.0.5 // indirect github.com/stoewer/go-strcase v1.3.0 // indirect github.com/tidwall/gjson v1.17.0 // indirect github.com/tidwall/match v1.1.1 // indirect @@ -285,6 +285,4 @@ require ( sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect ) -// replace github.com/flanksource/commons => ../commons - -// replace github.com/flanksource/duty => ../duty +// replace "github.com/flanksource/duty" => ../duty diff --git a/pkg/api.go b/pkg/api.go index 32b53251e..1722ece8c 100644 --- a/pkg/api.go +++ b/pkg/api.go @@ -64,62 +64,6 @@ func (s CheckStatus) GetTime() (time.Time, error) { return time.Parse("2006-01-02 15:04:05", s.Time) } -type Latency struct { - Percentile99 float64 `json:"p99,omitempty" db:"p99"` - Percentile97 float64 `json:"p97,omitempty" db:"p97"` - Percentile95 float64 `json:"p95,omitempty" db:"p95"` - Rolling1H float64 `json:"rolling1h"` -} - -func (l Latency) String() string { - s := "" - if l.Percentile99 != 0 { - s += fmt.Sprintf("p99=%s", utils.Age(time.Duration(l.Percentile99)*time.Millisecond)) - } - if l.Percentile95 != 0 { - s += fmt.Sprintf("p95=%s", utils.Age(time.Duration(l.Percentile95)*time.Millisecond)) - } - if l.Percentile97 != 0 { - s += fmt.Sprintf("p97=%s", utils.Age(time.Duration(l.Percentile97)*time.Millisecond)) - } - if l.Rolling1H != 0 { - s += fmt.Sprintf("rolling1h=%s", utils.Age(time.Duration(l.Rolling1H)*time.Millisecond)) - } - return s -} - -type Uptime struct { - Passed int `json:"passed"` - Failed int `json:"failed"` - P100 float64 `json:"p100,omitempty"` - LastPass *time.Time `json:"last_pass,omitempty"` - LastFail *time.Time `json:"last_fail,omitempty"` -} - -func (u Uptime) String() string { - if u.Passed == 0 && u.Failed == 0 { - return "" - } - if u.Passed == 0 { - return fmt.Sprintf("0/%d 0%%", u.Failed) - } - percentage := 100.0 * (1 - (float64(u.Failed) / float64(u.Passed+u.Failed))) - return fmt.Sprintf("%d/%d (%0.1f%%)", u.Passed, u.Passed+u.Failed, percentage) -} - -type Timeseries struct { - Key string `json:"key,omitempty"` - Time string `json:"time,omitempty"` - Status bool `json:"status,omitempty"` - Message string `json:"message,omitempty"` - Error string `json:"error,omitempty"` - Duration int `json:"duration"` - // Count is the number of times the check has been run in the specified time window - Count int `json:"count,omitempty"` - Passed int `json:"passed,omitempty"` - Failed int `json:"failed,omitempty"` -} - type Canary struct { ID uuid.UUID `gorm:"default:generate_ulid()"` AgentID uuid.UUID @@ -212,8 +156,8 @@ type Check struct { Labels types.JSONStringMap `json:"labels" gorm:"type:jsonstringmap"` Description string `json:"description,omitempty"` Status string `json:"status,omitempty"` - Uptime Uptime `json:"uptime" gorm:"-"` - Latency Latency `json:"latency" gorm:"-"` + Uptime types.Uptime `json:"uptime" gorm:"-"` + Latency types.Latency `json:"latency" gorm:"-"` Statuses []CheckStatus `json:"checkStatuses" gorm:"-"` Owner string `json:"owner,omitempty"` Severity string `json:"severity,omitempty"` diff --git a/pkg/api/api.go b/pkg/api/api.go index 71aba9c3e..896b17239 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -9,6 +9,7 @@ import ( "github.com/flanksource/duty/context" "github.com/flanksource/duty/models" "github.com/flanksource/duty/query" + "github.com/flanksource/duty/types" "github.com/labstack/echo/v4" "github.com/flanksource/canary-checker/pkg" @@ -44,11 +45,11 @@ type Response struct { } type DetailResponse struct { - Duration int `json:"duration,omitempty"` - RunnerName string `json:"runnerName"` - Status []pkg.Timeseries `json:"status"` - Latency pkg.Latency `json:"latency"` - Uptime pkg.Uptime `json:"uptime"` + Duration int `json:"duration,omitempty"` + RunnerName string `json:"runnerName"` + Status []query.Timeseries `json:"status"` + Latency types.Latency `json:"latency"` + Uptime types.Uptime `json:"uptime"` } func About(c echo.Context) error { @@ -60,30 +61,15 @@ func About(c echo.Context) error { } func CheckDetails(c echo.Context) error { - q, err := cache.ParseQuery(c) - if err != nil { + ctx := c.Request().Context().(context.Context) + + var q query.CheckQueryParams + if err := q.Init(c.QueryParams()); err != nil { return errorResponse(c, err, http.StatusBadRequest) } start := time.Now() - - end := q.GetEndTime() - since := q.GetStartTime() - timeRange := end.Sub(*since) - - if timeRange <= time.Hour*2 { - q.WindowDuration = time.Minute - } else if timeRange >= time.Hour*24 { - q.WindowDuration = time.Minute * 15 - } else if timeRange >= time.Hour*24*7 { - q.WindowDuration = time.Minute * 60 - } else { - q.WindowDuration = time.Hour * 4 - } - - ctx := c.Request().Context().(context.Context) - - results, uptime, latency, err := q.ExecuteDetails(ctx, ctx.Pool()) + results, uptime, latency, err := q.ExecuteDetails(ctx) if err != nil { return errorResponse(c, err, http.StatusInternalServerError) } diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go deleted file mode 100644 index 755d63f09..000000000 --- a/pkg/cache/cache.go +++ /dev/null @@ -1,135 +0,0 @@ -package cache - -import ( - "fmt" - "strconv" - "time" - - v1 "github.com/flanksource/canary-checker/api/v1" - "github.com/flanksource/canary-checker/pkg" - "github.com/labstack/echo/v4" - "github.com/pkg/errors" - "github.com/samber/lo" -) - -var DefaultCacheCount int - -var DefaultWindow string - -const AllStatuses = -1 - -type QueryParams struct { - Check string - CanaryID string - Start, End string - Window string - IncludeMessages bool - IncludeDetails bool - _start, _end *time.Time - StatusCount int - Labels map[string]string - Trace bool - WindowDuration time.Duration -} - -func (q QueryParams) Validate() error { - start, err := timeV(q.Start) - if err != nil { - return errors.Wrap(err, "start is invalid") - } - end, err := timeV(q.End) - if err != nil { - return errors.Wrap(err, "end is invalid") - } - if start != nil && end != nil { - if end.Before(*start) { - return fmt.Errorf("end time must be after start time") - } - } - return nil -} - -func (q QueryParams) GetStartTime() *time.Time { - if q._start != nil || q.Start == "" { - return q._start - } - q._start, _ = timeV(q.Start) - return q._start -} - -func (q QueryParams) GetEndTime() *time.Time { - if q._end != nil { - return q._end - } - if q.End == "" { - q._end = lo.ToPtr(time.Now()) - } - q._end, _ = timeV(q.End) - return q._end -} - -func (q QueryParams) String() string { - return fmt.Sprintf("check:=%s, start=%s, end=%s, count=%d", q.Check, q.Start, q.End, q.StatusCount) -} - -func ParseQuery(c echo.Context) (*QueryParams, error) { - queryParams := c.Request().URL.Query() - count := queryParams.Get("count") - var cacheCount int64 - var err error - if count != "" { - cacheCount, err = strconv.ParseInt(count, 10, 64) - if err != nil { - return nil, fmt.Errorf("count must be a number: %s", count) - } - } else { - cacheCount = int64(DefaultCacheCount) - } - - since := queryParams.Get("since") - if since == "" { - since = queryParams.Get("start") - } - if since == "" { - since = DefaultWindow - } - - until := queryParams.Get("until") - if until == "" { - until = queryParams.Get("end") - } - if until == "" { - until = "0s" - } - - q := QueryParams{ - Start: since, - End: until, - Window: queryParams.Get("window"), - IncludeMessages: isTrue(queryParams.Get("includeMessages")), - IncludeDetails: isTrue(queryParams.Get("includeDetails")), - Check: queryParams.Get("check"), - StatusCount: int(cacheCount), - Trace: isTrue(queryParams.Get("trace")), - CanaryID: queryParams.Get("canary_id"), - } - - if err := q.Validate(); err != nil { - return nil, err - } - - return &q, nil -} - -func isTrue(v string) bool { - return v == "true" -} - -type Cache interface { - Add(check pkg.Check, status ...pkg.CheckStatus) - GetDetails(checkkey string, time string) interface{} - RemoveChecks(canary v1.Canary) - Query(q QueryParams) (pkg.Checks, error) - QueryStatus(q QueryParams) ([]pkg.Timeseries, error) - RemoveCheckByKey(key string) -} diff --git a/pkg/cache/postgres_query.go b/pkg/cache/postgres_query.go deleted file mode 100644 index 6a7c2fdc8..000000000 --- a/pkg/cache/postgres_query.go +++ /dev/null @@ -1,144 +0,0 @@ -package cache - -import ( - "context" - "fmt" - "strings" - "time" - - "github.com/asecurityteam/rolling" - "github.com/flanksource/canary-checker/pkg" - "github.com/flanksource/commons/duration" - "github.com/jackc/pgx/v5" -) - -type Querier interface { - Query(ctx context.Context, query string, args ...interface{}) (pgx.Rows, error) -} - -func parseDuration(d string, name string) (clause string, arg interface{}, err error) { - if d == "" { - return "", nil, nil - } - dur, err := duration.ParseDuration(d) - if err == nil { - return fmt.Sprintf("(NOW() at TIME ZONE 'utc' - Interval '1 minute' * :%s)", name), dur.Minutes(), nil - } - if timestamp, err := time.Parse(time.RFC3339, d); err == nil { - return ":" + name, timestamp, nil - } - return "", nil, fmt.Errorf("start time must be a duration or RFC3339 timestamp") -} - -func (q QueryParams) GetWhereClause() (string, map[string]interface{}, error) { - clause := "" - args := make(map[string]interface{}) - and := " AND " - if q.Check != "" { - clause = "check_id = :check_key" - args["check_key"] = q.Check - } - if q.Start != "" && q.End == "" { - if clause != "" { - clause += and - } - start, arg, err := parseDuration(q.Start, "start") - if err != nil { - return "", nil, err - } - args["start"] = arg - clause += "time > " + start - } else if q.Start == "" && q.End != "" { - if clause != "" { - clause += and - } - end, arg, err := parseDuration(q.End, "end") - if err != nil { - return "", nil, err - } - args["end"] = arg - clause += "time < " + end - } - if q.Start != "" && q.End != "" { - if clause != "" { - clause += and - } - start, arg, err := parseDuration(q.Start, "start") - if err != nil { - return "", nil, err - } - args["start"] = arg - end, arg, err := parseDuration(q.End, "end") - if err != nil { - return "", nil, err - } - args["end"] = arg - clause += "time BETWEEN " + start + and + end - } - return strings.TrimSpace(clause), args, nil -} - -func (q QueryParams) ExecuteDetails(ctx context.Context, db Querier) ([]pkg.Timeseries, pkg.Uptime, pkg.Latency, error) { - start := q.GetStartTime().Format(time.RFC3339) - end := q.GetEndTime().Format(time.RFC3339) - - query := ` -With grouped_by_window AS ( - SELECT - duration, - status, - CASE WHEN check_statuses.status = TRUE THEN 1 ELSE 0 END AS passed, - CASE WHEN check_statuses.status = FALSE THEN 1 ELSE 0 END AS failed, - to_timestamp(floor((extract(epoch FROM time) + $1) / $2) * $2) AS time - FROM check_statuses - WHERE - time >= $3 AND - time <= $4 AND - check_id = $5 -) -SELECT - time, - bool_and(status), - AVG(duration)::integer as duration, - sum(passed) as passed, - sum(failed) as failed -FROM - grouped_by_window -GROUP BY time -ORDER BY time -` - args := []any{q.WindowDuration.Seconds() / 2, q.WindowDuration.Seconds(), start, end, q.Check} - - if q.WindowDuration == 0 { - // FIXME - query = `SELECT time, status, duration, - CASE WHEN check_statuses.status = TRUE THEN 1 ELSE 0 END AS passed, - CASE WHEN check_statuses.status = FALSE THEN 1 ELSE 0 END AS failed - FROM check_statuses WHERE time >= $1 AND time <= $2 AND check_id = $3` - args = []any{start, end, q.Check} - } - uptime := pkg.Uptime{} - latency := rolling.NewPointPolicy(rolling.NewWindow(100)) - - rows, err := db.Query(ctx, query, args...) - if err != nil { - return nil, uptime, pkg.Latency{}, err - } - defer rows.Close() - - var results []pkg.Timeseries - for rows.Next() { - var datapoint pkg.Timeseries - var ts time.Time - if err := rows.Scan(&ts, &datapoint.Status, &datapoint.Duration, &datapoint.Passed, &datapoint.Failed); err != nil { - return nil, uptime, pkg.Latency{}, err - } - uptime.Failed += datapoint.Failed - uptime.Passed += datapoint.Passed - latency.Append(float64(datapoint.Duration)) - datapoint.Time = ts.Format(time.RFC3339) - results = append(results, datapoint) - } - - return results, uptime, pkg.Latency{Percentile95: latency.Reduce(rolling.Percentile(95))}, nil -} diff --git a/pkg/cache/postgres_util.go b/pkg/cache/postgres_util.go index 1582bda0b..cc0a38602 100644 --- a/pkg/cache/postgres_util.go +++ b/pkg/cache/postgres_util.go @@ -3,9 +3,6 @@ package cache import ( "fmt" "strings" - "time" - - "github.com/flanksource/commons/duration" ) func ConvertNamedParamsDebug(sql string, namedArgs map[string]interface{}) string { @@ -27,28 +24,3 @@ func ConvertNamedParams(sql string, namedArgs map[string]interface{}) (string, [ } return sql, args } - -func timeV(v interface{}) (*time.Time, error) { - if v == nil { - return nil, nil - } - switch v := v.(type) { - case time.Time: - return &v, nil - case time.Duration: - t := time.Now().Add(v * -1) - return &t, nil - case string: - if v == "" { - return nil, nil - } - if t, err := time.Parse(time.RFC3339, v); err == nil { - return &t, nil - } else if d, err := duration.ParseDuration(v); err == nil { - t := time.Now().Add(time.Duration(d) * -1) - return &t, nil - } - return nil, fmt.Errorf("time must be a duration or RFC3339 timestamp") - } - return nil, fmt.Errorf("unknown time type %T", v) -} diff --git a/pkg/jobs/canary/status.go b/pkg/jobs/canary/status.go index dfb67f01a..4d98c6125 100644 --- a/pkg/jobs/canary/status.go +++ b/pkg/jobs/canary/status.go @@ -6,11 +6,12 @@ import ( v1 "github.com/flanksource/canary-checker/api/v1" "github.com/flanksource/canary-checker/pkg" - "github.com/flanksource/canary-checker/pkg/cache" "github.com/flanksource/canary-checker/pkg/db" "github.com/flanksource/canary-checker/pkg/metrics" "github.com/flanksource/canary-checker/pkg/utils" "github.com/flanksource/duty/context" + "github.com/flanksource/duty/query" + dutyTypes "github.com/flanksource/duty/types" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" ) @@ -28,7 +29,7 @@ func updateCanaryStatusAndEvent(ctx context.Context, canary v1.Canary, results [ var pass = true var lastTransitionedTime *metav1.Time var highestLatency float64 - var uptimeAgg pkg.Uptime + var uptimeAgg dutyTypes.Uptime transitioned := false for _, result := range results { @@ -53,7 +54,7 @@ func updateCanaryStatusAndEvent(ctx context.Context, canary v1.Canary, results [ } // Transition - q := cache.QueryParams{Check: checkID, StatusCount: 1} + q := query.CheckQueryParams{Check: checkID, StatusCount: 1} if canary.Status.LastTransitionedTime != nil { q.Start = canary.Status.LastTransitionedTime.Format(time.RFC3339) } diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 982b935c5..fa56974c5 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -8,8 +8,10 @@ import ( "github.com/flanksource/canary-checker/pkg" "github.com/flanksource/canary-checker/pkg/runner" "github.com/flanksource/commons/logger" + "github.com/flanksource/duty/types" cmap "github.com/orcaman/concurrent-map" "github.com/prometheus/client_golang/prometheus" + "github.com/samber/lo" ) var ( @@ -120,8 +122,8 @@ func RemoveCheckByKey(key string) { latencies.Remove(key) } -func GetMetrics(key string) (uptime pkg.Uptime, latency pkg.Latency) { - uptime = pkg.Uptime{} +func GetMetrics(key string) (uptime types.Uptime, latency types.Latency) { + uptime = types.Uptime{} fail, ok := failed.Get(key) if ok { @@ -135,12 +137,12 @@ func GetMetrics(key string) (uptime pkg.Uptime, latency pkg.Latency) { lat, ok := latencies.Get(key) if ok { - latency = pkg.Latency{Rolling1H: lat.(*rolling.TimePolicy).Reduce(rolling.Percentile(95))} + latency = types.Latency{Rolling1H: lat.(*rolling.TimePolicy).Reduce(rolling.Percentile(95))} } return } -func Record(canary v1.Canary, result *pkg.CheckResult) (_uptime pkg.Uptime, _latency pkg.Latency) { +func Record(canary v1.Canary, result *pkg.CheckResult) (_uptime types.Uptime, _latency types.Latency) { defer func() { e := recover() if e != nil { @@ -236,11 +238,11 @@ func Record(canary v1.Canary, result *pkg.CheckResult) (_uptime pkg.Uptime, _lat OpsFailedCount.WithLabelValues(checkType, endpoint, canaryName, canaryNamespace, owner, severity, key, name).Inc() } - _uptime = pkg.Uptime{Passed: int(pass.Reduce(rolling.Sum)), Failed: int(fail.Reduce(rolling.Sum))} + _uptime = types.Uptime{Passed: int(pass.Reduce(rolling.Sum)), Failed: int(fail.Reduce(rolling.Sum))} if latency != nil { - _latency = pkg.Latency{Rolling1H: latency.Reduce(rolling.Percentile(95))} + _latency = types.Latency{Rolling1H: latency.Reduce(rolling.Percentile(95))} } else { - _latency = pkg.Latency{} + _latency = types.Latency{} } return _uptime, _latency } @@ -308,7 +310,7 @@ func getOrCreateHistogram(m pkg.Metric) error { } } -func FillLatencies(checkKey string, duration string, latency *pkg.Latency) error { +func FillLatencies(checkKey string, duration string, latency *types.Latency) error { if runner.Prometheus == nil || duration == "" { return nil } @@ -331,7 +333,7 @@ func FillLatencies(checkKey string, duration string, latency *pkg.Latency) error return nil } -func FillUptime(checkKey, duration string, uptime *pkg.Uptime) error { +func FillUptime(checkKey, duration string, uptime *types.Uptime) error { if runner.Prometheus == nil || duration == "" { return nil } @@ -339,7 +341,7 @@ func FillUptime(checkKey, duration string, uptime *pkg.Uptime) error { if err != nil { return err } - uptime.P100 = value + uptime.P100 = lo.ToPtr(value) return nil }