diff --git a/app/initriver.go b/app/initriver.go index 8d6d8da522..9683332776 100644 --- a/app/initriver.go +++ b/app/initriver.go @@ -3,6 +3,7 @@ package app import ( "context" "log/slog" + "time" "github.com/riverqueue/river" "github.com/riverqueue/river/riverdriver/riverdatabasesql" @@ -38,10 +39,6 @@ func (r *riverErrs) HandlePanic(ctx context.Context, job *rivertype.JobRow, pani return nil } -type noopWorker struct{} - -func (noopWorker) Kind() string { return "noop" } - // ignoreCancel is a slog.Handler that ignores log records with an "error" attribute of "context canceled". type ignoreCancel struct{ h slog.Handler } @@ -84,14 +81,6 @@ func (w workerMiddlewareFunc) Work(ctx context.Context, job *rivertype.JobRow, d func (app *App) initRiver(ctx context.Context) error { app.RiverWorkers = river.NewWorkers() - // TODO: remove once a worker is added that's not behind a feature flag - // - // Without this, it will complain about no workers being registered. - river.AddWorker(app.RiverWorkers, river.WorkFunc(func(ctx context.Context, j *river.Job[noopWorker]) error { - // Do something with the job - return nil - })) - var err error app.River, err = river.NewClient(riverpgxv5.New(app.pgx), &river.Config{ // River tends to log "context canceled" errors while shutting down @@ -100,6 +89,7 @@ func (app *App) initRiver(ctx context.Context) error { Queues: map[string]river.QueueConfig{ river.QueueDefault: {MaxWorkers: 100}, }, + RescueStuckJobsAfter: 5 * time.Minute, WorkerMiddleware: []rivertype.WorkerMiddleware{ workerMiddlewareFunc(func(ctx context.Context, doInner func(ctx context.Context) error) error { // Ensure config is set in the context for all workers. diff --git a/engine/cleanupmanager/alertlogs.go b/engine/cleanupmanager/alertlogs.go new file mode 100644 index 0000000000..0d1a594695 --- /dev/null +++ b/engine/cleanupmanager/alertlogs.go @@ -0,0 +1,108 @@ +package cleanupmanager + +import ( + "context" + "database/sql" + "errors" + "fmt" + + "github.com/jackc/pgx/v5" + "github.com/riverqueue/river" + "github.com/target/goalert/gadb" +) + +type AlertLogLFWArgs struct{} + +func (AlertLogLFWArgs) Kind() string { return "cleanup-manager-alert-logs-lfw" } + +type AlertLogArgs struct { + StartID int64 + EndID int64 +} + +const ( + batchSize = 5000 + blockSize = 100000 +) + +// LookForWorkAlertLogs will schedule alert log cleanup jobs for blocks of alert log IDs. +// +// The strategy here is to look for the minimum and maximum alert log IDs in the database, then schedule jobs for each `blockSize` block of IDs, +// and those jobs will then cleanup the alert logs in that range `batchSize` at a time. +func (db *DB) LookForWorkAlertLogs(ctx context.Context, j *river.Job[AlertLogLFWArgs]) error { + var min, max int64 + err := db.lock.WithTxShared(ctx, func(ctx context.Context, tx *sql.Tx) error { + row, err := gadb.New(tx).CleanupMgrAlertLogsMinMax(ctx) + if err != nil { + return err + } + min, max = row.MinID, row.MaxID + return nil + }) + if min == 0 && max == 0 { + return nil + } + if err != nil { + return fmt.Errorf("get min/max alert log ID: %w", err) + } + + max++ + + var params []river.InsertManyParams + for i := int64(0); i < max; i += blockSize { + if i < min { + // skip sparse blocks + continue + } + + params = append(params, river.InsertManyParams{ + Args: AlertLogArgs{StartID: i, EndID: i + blockSize}, + InsertOpts: &river.InsertOpts{ + Queue: QueueName, + Priority: PriorityAlertLogs, + UniqueOpts: river.UniqueOpts{ByArgs: true}, + }, + }) + } + + if len(params) == 0 { + return nil + } + + _, err = river.ClientFromContext[pgx.Tx](ctx).InsertMany(ctx, params) + if err != nil { + return fmt.Errorf("insert many: %w", err) + } + + return nil +} + +func (AlertLogArgs) Kind() string { return "cleanup-manager-alert-logs" } + +// CleanupAlertLogs will remove alert log entries for deleted alerts. +func (db *DB) CleanupAlertLogs(ctx context.Context, j *river.Job[AlertLogArgs]) error { + lastID := j.Args.StartID + + err := db.whileWork(ctx, func(ctx context.Context, tx *sql.Tx) (done bool, err error) { + db.logger.DebugContext(ctx, "Cleaning up alert logs...", "lastID", lastID) + lastID, err = gadb.New(tx).CleanupAlertLogs(ctx, + gadb.CleanupAlertLogsParams{ + BatchSize: batchSize, + StartID: lastID, + EndID: j.Args.EndID, + }) + if errors.Is(err, sql.ErrNoRows) { + return true, nil + } + if err != nil { + return false, fmt.Errorf("cleanup alert logs: %w", err) + } + + return false, nil + }) + if err != nil { + return fmt.Errorf("cleanup alert logs: %w", err) + } + + return nil +} diff --git a/engine/cleanupmanager/db.go b/engine/cleanupmanager/db.go index 56f37e8a1a..605f0c855a 100644 --- a/engine/cleanupmanager/db.go +++ b/engine/cleanupmanager/db.go @@ -20,12 +20,9 @@ type DB struct { cleanupSessions *sql.Stmt - cleanupAlertLogs *sql.Stmt - alertStore *alert.Store - logIndex int - logger *slog.Logger + logger *slog.Logger } // Name returns the name of the module. @@ -55,22 +52,6 @@ func NewDB(ctx context.Context, db *sql.DB, alertstore *alert.Store, log *slog.L cleanupSessions: p.P(`DELETE FROM auth_user_sessions WHERE id = any(select id from auth_user_sessions where last_access_at < (now() - '30 days'::interval) LIMIT 100 for update skip locked)`), - cleanupAlertLogs: p.P(` - with - scope as (select id from alert_logs where id > $1 order by id limit 100), - id_range as (select min(id), max(id) from scope), - _delete as ( - delete from alert_logs where id = any( - select id from alert_logs - where - id between (select min from id_range) and (select max from id_range) and - not exists (select 1 from alerts where alert_id = id) - for update skip locked - ) - ) - select id from scope offset 99 - `), - alertStore: alertstore, }, p.Err } diff --git a/engine/cleanupmanager/queries.sql b/engine/cleanupmanager/queries.sql index 99398f4156..ae34196666 100644 --- a/engine/cleanupmanager/queries.sql +++ b/engine/cleanupmanager/queries.sql @@ -129,3 +129,63 @@ FROM WHERE id = ANY (sqlc.arg(user_ids)::uuid[]); +-- name: CleanupMgrAlertLogsMinMax :one +-- CleanupMgrAlertLogsMinMax will find the minimum and maximum id of the alert_logs table. +SELECT + coalesce(min(id), 0)::bigint AS min_id, + coalesce(max(id), 0)::bigint AS max_id +FROM + alert_logs; + +-- name: CleanupAlertLogs :one +WITH scope AS ( + SELECT + id + FROM + alert_logs l + WHERE + l.id BETWEEN @start_id AND @end_id - 1 + ORDER BY + l.id + LIMIT @batch_size +), +id_range AS ( + SELECT + min(id), + max(id) + FROM + scope +), +_delete AS ( + DELETE FROM alert_logs + WHERE id = ANY ( + SELECT + id + FROM + alert_logs + WHERE + id BETWEEN ( + SELECT + min + FROM + id_range) + AND ( + SELECT + max + FROM + id_range) + AND NOT EXISTS ( + SELECT + 1 + FROM + alerts + WHERE + alert_id = id) + FOR UPDATE + SKIP LOCKED)) + SELECT + id + FROM + scope OFFSET @batch_size - 1 + LIMIT 1; + diff --git a/engine/cleanupmanager/setup.go b/engine/cleanupmanager/setup.go index e3f3d773d2..787c34df38 100644 --- a/engine/cleanupmanager/setup.go +++ b/engine/cleanupmanager/setup.go @@ -16,10 +16,12 @@ var _ processinglock.Setupable = &DB{} const QueueName = "cleanup-manager" const ( - PriorityAlertCleanup = iota + 1 - PrioritySchedHistory - PriorityTempSchedLFW - PriorityTempSched + PriorityAlertCleanup = 1 + PrioritySchedHistory = 1 + PriorityTempSchedLFW = 2 + PriorityAlertLogsLFW = 2 + PriorityTempSched = 3 + PriorityAlertLogs = 4 ) // whileWork will run the provided function in a loop until it returns done=true. @@ -53,6 +55,8 @@ func (db *DB) Setup(ctx context.Context, args processinglock.SetupArgs) error { river.AddWorker(args.Workers, river.WorkFunc(db.CleanupShifts)) river.AddWorker(args.Workers, river.WorkFunc(db.CleanupScheduleData)) river.AddWorker(args.Workers, river.WorkFunc(db.LookForWorkScheduleData)) + river.AddWorker(args.Workers, river.WorkFunc(db.CleanupAlertLogs)) + river.AddWorker(args.Workers, river.WorkFunc(db.LookForWorkAlertLogs)) err := args.River.Queues().Add(QueueName, river.QueueConfig{MaxWorkers: 5}) if err != nil { @@ -100,22 +104,13 @@ func (db *DB) Setup(ctx context.Context, args processinglock.SetupArgs) error { args.River.PeriodicJobs().AddMany([]*river.PeriodicJob{ river.NewPeriodicJob( - river.PeriodicInterval(24*time.Hour), - func() (river.JobArgs, *river.InsertOpts) { - return ShiftArgs{}, &river.InsertOpts{ - Queue: QueueName, - } - }, - &river.PeriodicJobOpts{RunOnStart: true}, - ), - }) - - args.River.PeriodicJobs().AddMany([]*river.PeriodicJob{ - river.NewPeriodicJob( - river.PeriodicInterval(24*time.Hour), + river.PeriodicInterval(7*24*time.Hour), func() (river.JobArgs, *river.InsertOpts) { - return SchedDataArgs{}, &river.InsertOpts{ + return AlertLogLFWArgs{}, &river.InsertOpts{ Queue: QueueName, + UniqueOpts: river.UniqueOpts{ + ByArgs: true, + }, } }, &river.PeriodicJobOpts{RunOnStart: true}, diff --git a/engine/cleanupmanager/update.go b/engine/cleanupmanager/update.go index 6c4dd53982..3e0ffc817b 100644 --- a/engine/cleanupmanager/update.go +++ b/engine/cleanupmanager/update.go @@ -2,8 +2,6 @@ package cleanupmanager import ( "context" - "database/sql" - "errors" "fmt" "github.com/jackc/pgtype" @@ -54,15 +52,5 @@ func (db *DB) update(ctx context.Context) error { } } - err = tx.StmtContext(ctx, db.cleanupAlertLogs).QueryRowContext(ctx, db.logIndex).Scan(&db.logIndex) - if errors.Is(err, sql.ErrNoRows) { - // repeat - db.logIndex = 0 - err = nil - } - if err != nil { - return fmt.Errorf("cleanup alert_logs: %w", err) - } - return tx.Commit() } diff --git a/gadb/queries.sql.go b/gadb/queries.sql.go index dd66aa74cb..ab21f275ad 100644 --- a/gadb/queries.sql.go +++ b/gadb/queries.sql.go @@ -851,6 +851,98 @@ func (q *Queries) CalSubUserNames(ctx context.Context, dollar_1 []uuid.UUID) ([] return items, nil } +const cleanupAlertLogs = `-- name: CleanupAlertLogs :one +WITH scope AS ( + SELECT + id + FROM + alert_logs l + WHERE + l.id BETWEEN $2 AND $3- 1 + ORDER BY + l.id + LIMIT $4 +), +id_range AS ( + SELECT + min(id), + max(id) + FROM + scope +), +_delete AS ( + DELETE FROM alert_logs + WHERE id = ANY ( + SELECT + id + FROM + alert_logs + WHERE + id BETWEEN ( + SELECT + min + FROM + id_range) + AND ( + SELECT + max + FROM + id_range) + AND NOT EXISTS ( + SELECT + 1 + FROM + alerts + WHERE + alert_id = id) + FOR UPDATE + SKIP LOCKED)) + SELECT + id + FROM + scope OFFSET $1- 1 + LIMIT 1 +` + +type CleanupAlertLogsParams struct { + BatchSize int32 + StartID int64 + EndID int64 +} + +func (q *Queries) CleanupAlertLogs(ctx context.Context, arg CleanupAlertLogsParams) (int64, error) { + row := q.db.QueryRowContext(ctx, cleanupAlertLogs, + arg.BatchSize, + arg.StartID, + arg.EndID, + arg.BatchSize, + ) + var id int64 + err := row.Scan(&id) + return id, err +} + +const cleanupMgrAlertLogsMinMax = `-- name: CleanupMgrAlertLogsMinMax :one +SELECT + coalesce(min(id), 0)::bigint AS min_id, + coalesce(max(id), 0)::bigint AS max_id +FROM + alert_logs +` + +type CleanupMgrAlertLogsMinMaxRow struct { + MinID int64 + MaxID int64 +} + +// CleanupMgrAlertLogsMinMax will find the minimum and maximum id of the alert_logs table. +func (q *Queries) CleanupMgrAlertLogsMinMax(ctx context.Context) (CleanupMgrAlertLogsMinMaxRow, error) { + row := q.db.QueryRowContext(ctx, cleanupMgrAlertLogsMinMax) + var i CleanupMgrAlertLogsMinMaxRow + err := row.Scan(&i.MinID, &i.MaxID) + return i, err +} + const cleanupMgrDeleteOldAlerts = `-- name: CleanupMgrDeleteOldAlerts :execrows DELETE FROM alerts WHERE id = ANY (