Skip to content

Commit

Permalink
engine/cleanupmgr: move alert log operations to job queue (#4211)
Browse files Browse the repository at this point in the history
* cleanup: implement context-aware sleep and cleanup manager functionality

* cleanup: format code for consistency in alert store initialization

* cleanup: remove unused ConfigSource field from SetupArgs struct

* cleanup: refactor alert cleanup logic and add shift cleanup functionality

* cleanup: refactor alert cleanup logic to use whileWork for better control flow

* cleanup: add comment

* cleanup: remove unused cleanup statements from DB and update logic

* test: refactor alert auto-close and cleanup tests for improved reliability

* fix: update ShiftArgs Kind to reflect cleanup-manager-shifts

* feat: add periodic jobs for schedule data cleanup and update queries

* feat: add workers for cleanup of shifts and schedule data

* fix: update periodic job interval for schedule data cleanup to 24 hours

* feat: add logging support to cleanup manager and engine initialization

* refactor: streamline schedule data cleanup by extracting user validation and shift trimming logic

* feat: add logging for schedule data updates in cleanup manager

* refactor: remove unnecessary checks for empty shifts in user and shift trimming functions

* refactor: enhance schedule data cleanup by improving user validation and logging

* refactor: improve formatting of schedule data update call in cleanup manager

* docs: add comment to clarify cleanupData function purpose in schedule data management

* feat: add CleanupAlertLogs function to manage alert log entries for deleted alerts

* refactor: remove unused cleanupAlertLogs statement and related logic from cleanup manager

* feat: implement timeout for CleanupAlertLogs worker to handle longer job durations

* refactor: rename cleanupDays to more descriptive staleThresholdDays in cleanup manager functions

* docs: enhance comment for CleanupMgrScheduleData to clarify last_cleanup_at usage

* engine/cleanupmgr: refactor schedule data cleanup logic and add job for looking up schedules needing cleanup

* engine/cleanupmgr: implement alert log cleanup job scheduling and refactor related queries

* engine/initriver: remove unused noopWorker type

* engine/cleanupmgr: rename LookForWorkArgs types for consistency

* fix(cleanupmgr): handle empty alert_logs by using coalesce for min and max IDs
  • Loading branch information
mastercactapus authored Dec 23, 2024
1 parent 11f21f3 commit 22265b2
Show file tree
Hide file tree
Showing 7 changed files with 276 additions and 62 deletions.
14 changes: 2 additions & 12 deletions app/initriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package app
import (
"context"
"log/slog"
"time"

"github.com/riverqueue/river"
"github.com/riverqueue/river/riverdriver/riverdatabasesql"
Expand Down Expand Up @@ -38,10 +39,6 @@ func (r *riverErrs) HandlePanic(ctx context.Context, job *rivertype.JobRow, pani
return nil
}

type noopWorker struct{}

func (noopWorker) Kind() string { return "noop" }

// ignoreCancel is a slog.Handler that ignores log records with an "error" attribute of "context canceled".
type ignoreCancel struct{ h slog.Handler }

Expand Down Expand Up @@ -84,14 +81,6 @@ func (w workerMiddlewareFunc) Work(ctx context.Context, job *rivertype.JobRow, d
func (app *App) initRiver(ctx context.Context) error {
app.RiverWorkers = river.NewWorkers()

// TODO: remove once a worker is added that's not behind a feature flag
//
// Without this, it will complain about no workers being registered.
river.AddWorker(app.RiverWorkers, river.WorkFunc(func(ctx context.Context, j *river.Job[noopWorker]) error {
// Do something with the job
return nil
}))

var err error
app.River, err = river.NewClient(riverpgxv5.New(app.pgx), &river.Config{
// River tends to log "context canceled" errors while shutting down
Expand All @@ -100,6 +89,7 @@ func (app *App) initRiver(ctx context.Context) error {
Queues: map[string]river.QueueConfig{
river.QueueDefault: {MaxWorkers: 100},
},
RescueStuckJobsAfter: 5 * time.Minute,
WorkerMiddleware: []rivertype.WorkerMiddleware{
workerMiddlewareFunc(func(ctx context.Context, doInner func(ctx context.Context) error) error {
// Ensure config is set in the context for all workers.
Expand Down
108 changes: 108 additions & 0 deletions engine/cleanupmanager/alertlogs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package cleanupmanager

import (
"context"
"database/sql"
"errors"
"fmt"

"github.com/jackc/pgx/v5"
"github.com/riverqueue/river"
"github.com/target/goalert/gadb"
)

type AlertLogLFWArgs struct{}

func (AlertLogLFWArgs) Kind() string { return "cleanup-manager-alert-logs-lfw" }

type AlertLogArgs struct {
StartID int64
EndID int64
}

const (
batchSize = 5000
blockSize = 100000
)

// LookForWorkAlertLogs will schedule alert log cleanup jobs for blocks of alert log IDs.
//
// The strategy here is to look for the minimum and maximum alert log IDs in the database, then schedule jobs for each `blockSize` block of IDs,
// and those jobs will then cleanup the alert logs in that range `batchSize` at a time.
func (db *DB) LookForWorkAlertLogs(ctx context.Context, j *river.Job[AlertLogLFWArgs]) error {
var min, max int64
err := db.lock.WithTxShared(ctx, func(ctx context.Context, tx *sql.Tx) error {
row, err := gadb.New(tx).CleanupMgrAlertLogsMinMax(ctx)
if err != nil {
return err
}
min, max = row.MinID, row.MaxID
return nil
})
if min == 0 && max == 0 {
return nil
}
if err != nil {
return fmt.Errorf("get min/max alert log ID: %w", err)
}

max++

var params []river.InsertManyParams
for i := int64(0); i < max; i += blockSize {
if i < min {
// skip sparse blocks
continue
}

params = append(params, river.InsertManyParams{
Args: AlertLogArgs{StartID: i, EndID: i + blockSize},
InsertOpts: &river.InsertOpts{
Queue: QueueName,
Priority: PriorityAlertLogs,
UniqueOpts: river.UniqueOpts{ByArgs: true},
},
})
}

if len(params) == 0 {
return nil
}

_, err = river.ClientFromContext[pgx.Tx](ctx).InsertMany(ctx, params)
if err != nil {
return fmt.Errorf("insert many: %w", err)
}

return nil
}

func (AlertLogArgs) Kind() string { return "cleanup-manager-alert-logs" }

// CleanupAlertLogs will remove alert log entries for deleted alerts.
func (db *DB) CleanupAlertLogs(ctx context.Context, j *river.Job[AlertLogArgs]) error {
lastID := j.Args.StartID

err := db.whileWork(ctx, func(ctx context.Context, tx *sql.Tx) (done bool, err error) {
db.logger.DebugContext(ctx, "Cleaning up alert logs...", "lastID", lastID)
lastID, err = gadb.New(tx).CleanupAlertLogs(ctx,
gadb.CleanupAlertLogsParams{
BatchSize: batchSize,
StartID: lastID,
EndID: j.Args.EndID,
})
if errors.Is(err, sql.ErrNoRows) {
return true, nil
}
if err != nil {
return false, fmt.Errorf("cleanup alert logs: %w", err)
}

return false, nil
})
if err != nil {
return fmt.Errorf("cleanup alert logs: %w", err)
}

return nil
}
21 changes: 1 addition & 20 deletions engine/cleanupmanager/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,9 @@ type DB struct {

cleanupSessions *sql.Stmt

cleanupAlertLogs *sql.Stmt

alertStore *alert.Store

logIndex int
logger *slog.Logger
logger *slog.Logger
}

// Name returns the name of the module.
Expand Down Expand Up @@ -55,22 +52,6 @@ func NewDB(ctx context.Context, db *sql.DB, alertstore *alert.Store, log *slog.L

cleanupSessions: p.P(`DELETE FROM auth_user_sessions WHERE id = any(select id from auth_user_sessions where last_access_at < (now() - '30 days'::interval) LIMIT 100 for update skip locked)`),

cleanupAlertLogs: p.P(`
with
scope as (select id from alert_logs where id > $1 order by id limit 100),
id_range as (select min(id), max(id) from scope),
_delete as (
delete from alert_logs where id = any(
select id from alert_logs
where
id between (select min from id_range) and (select max from id_range) and
not exists (select 1 from alerts where alert_id = id)
for update skip locked
)
)
select id from scope offset 99
`),

alertStore: alertstore,
}, p.Err
}
60 changes: 60 additions & 0 deletions engine/cleanupmanager/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,63 @@ FROM
WHERE
id = ANY (sqlc.arg(user_ids)::uuid[]);

-- name: CleanupMgrAlertLogsMinMax :one
-- CleanupMgrAlertLogsMinMax will find the minimum and maximum id of the alert_logs table.
SELECT
coalesce(min(id), 0)::bigint AS min_id,
coalesce(max(id), 0)::bigint AS max_id
FROM
alert_logs;

-- name: CleanupAlertLogs :one
WITH scope AS (
SELECT
id
FROM
alert_logs l
WHERE
l.id BETWEEN @start_id AND @end_id - 1
ORDER BY
l.id
LIMIT @batch_size
),
id_range AS (
SELECT
min(id),
max(id)
FROM
scope
),
_delete AS (
DELETE FROM alert_logs
WHERE id = ANY (
SELECT
id
FROM
alert_logs
WHERE
id BETWEEN (
SELECT
min
FROM
id_range)
AND (
SELECT
max
FROM
id_range)
AND NOT EXISTS (
SELECT
1
FROM
alerts
WHERE
alert_id = id)
FOR UPDATE
SKIP LOCKED))
SELECT
id
FROM
scope OFFSET @batch_size - 1
LIMIT 1;

31 changes: 13 additions & 18 deletions engine/cleanupmanager/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ var _ processinglock.Setupable = &DB{}
const QueueName = "cleanup-manager"

const (
PriorityAlertCleanup = iota + 1
PrioritySchedHistory
PriorityTempSchedLFW
PriorityTempSched
PriorityAlertCleanup = 1
PrioritySchedHistory = 1
PriorityTempSchedLFW = 2
PriorityAlertLogsLFW = 2
PriorityTempSched = 3
PriorityAlertLogs = 4
)

// whileWork will run the provided function in a loop until it returns done=true.
Expand Down Expand Up @@ -53,6 +55,8 @@ func (db *DB) Setup(ctx context.Context, args processinglock.SetupArgs) error {
river.AddWorker(args.Workers, river.WorkFunc(db.CleanupShifts))
river.AddWorker(args.Workers, river.WorkFunc(db.CleanupScheduleData))
river.AddWorker(args.Workers, river.WorkFunc(db.LookForWorkScheduleData))
river.AddWorker(args.Workers, river.WorkFunc(db.CleanupAlertLogs))
river.AddWorker(args.Workers, river.WorkFunc(db.LookForWorkAlertLogs))

err := args.River.Queues().Add(QueueName, river.QueueConfig{MaxWorkers: 5})
if err != nil {
Expand Down Expand Up @@ -100,22 +104,13 @@ func (db *DB) Setup(ctx context.Context, args processinglock.SetupArgs) error {

args.River.PeriodicJobs().AddMany([]*river.PeriodicJob{
river.NewPeriodicJob(
river.PeriodicInterval(24*time.Hour),
func() (river.JobArgs, *river.InsertOpts) {
return ShiftArgs{}, &river.InsertOpts{
Queue: QueueName,
}
},
&river.PeriodicJobOpts{RunOnStart: true},
),
})

args.River.PeriodicJobs().AddMany([]*river.PeriodicJob{
river.NewPeriodicJob(
river.PeriodicInterval(24*time.Hour),
river.PeriodicInterval(7*24*time.Hour),
func() (river.JobArgs, *river.InsertOpts) {
return SchedDataArgs{}, &river.InsertOpts{
return AlertLogLFWArgs{}, &river.InsertOpts{
Queue: QueueName,
UniqueOpts: river.UniqueOpts{
ByArgs: true,
},
}
},
&river.PeriodicJobOpts{RunOnStart: true},
Expand Down
12 changes: 0 additions & 12 deletions engine/cleanupmanager/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package cleanupmanager

import (
"context"
"database/sql"
"errors"
"fmt"

"github.com/jackc/pgtype"
Expand Down Expand Up @@ -54,15 +52,5 @@ func (db *DB) update(ctx context.Context) error {
}
}

err = tx.StmtContext(ctx, db.cleanupAlertLogs).QueryRowContext(ctx, db.logIndex).Scan(&db.logIndex)
if errors.Is(err, sql.ErrNoRows) {
// repeat
db.logIndex = 0
err = nil
}
if err != nil {
return fmt.Errorf("cleanup alert_logs: %w", err)
}

return tx.Commit()
}
Loading

0 comments on commit 22265b2

Please sign in to comment.