Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup live queries #22438

Merged
merged 2 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/22094-cleanup-queries
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- updated activity cleanup job to remove all expired live queries to improve API performance in environment using large volumes of live queries. To note, the cleanup cron may take longer on the first run after upgrade.
74 changes: 52 additions & 22 deletions server/datastore/mysql/activities.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,35 +539,65 @@ func (ds *Datastore) CleanupActivitiesAndAssociatedData(ctx context.Context, max
// `activities` and `queries` are not tied because the activity itself holds
// the query SQL so they don't need to be executed on the same transaction.
//
if err := ds.withTx(ctx, func(tx sqlx.ExtContext) error {
// Delete temporary queries (aka "not saved").
if _, err := tx.ExecContext(ctx,
`DELETE FROM queries
// All expired live queries are deleted in batch sizes of `maxCount` to ensure
// the table size is kept in check with high volumes of live queries (zero-trust workflows).
// This differs from the `activities` cleanup which uses maxCount as a limit to
// the number of activities to delete.
//
for {
if ctx.Err() != nil {
return ctx.Err()
}

var rowsAffected int64

// Start a new transaction for each batch of deletions.
err := ds.withTx(ctx, func(tx sqlx.ExtContext) error {
// Delete expired live queries (aka "not saved")
result, err := tx.ExecContext(ctx,
`DELETE FROM queries
WHERE NOT saved AND created_at < DATE_SUB(NOW(), INTERVAL ? DAY)
LIMIT ?`,
expiredWindowDays, maxCount,
); err != nil {
return ctxerr.Wrap(ctx, err, "delete expired non-saved queries")
}
// Delete distributed campaigns that reference unexisting query (removed in the previous query).
if _, err := tx.ExecContext(ctx,
`DELETE distributed_query_campaigns FROM distributed_query_campaigns
expiredWindowDays, maxCount,
)
if err != nil {
return ctxerr.Wrap(ctx, err, "delete expired non-saved queries")
}

rowsAffected, err = result.RowsAffected()
if err != nil {
return ctxerr.Wrap(ctx, err, "retrieving rows affected from delete query")
}

// Cleanup orphaned distributed campaigns that reference non-existing queries.
if _, err := tx.ExecContext(ctx,
`DELETE distributed_query_campaigns FROM distributed_query_campaigns
LEFT JOIN queries ON (distributed_query_campaigns.query_id=queries.id)
WHERE queries.id IS NULL`,
); err != nil {
return ctxerr.Wrap(ctx, err, "delete expired orphaned distributed_query_campaigns")
}
// Delete distributed campaign targets that reference unexisting distributed campaign (removed in the previous query).
if _, err := tx.ExecContext(ctx,
`DELETE distributed_query_campaign_targets FROM distributed_query_campaign_targets
); err != nil {
return ctxerr.Wrap(ctx, err, "delete expired orphaned distributed_query_campaigns")
}

// Cleanup orphaned distributed campaign targets that reference non-existing distributed campaigns.
if _, err := tx.ExecContext(ctx,
`DELETE distributed_query_campaign_targets FROM distributed_query_campaign_targets
LEFT JOIN distributed_query_campaigns ON (distributed_query_campaign_targets.distributed_query_campaign_id=distributed_query_campaigns.id)
WHERE distributed_query_campaigns.id IS NULL`,
); err != nil {
return ctxerr.Wrap(ctx, err, "delete expired orphaned distributed_query_campaign_targets")
); err != nil {
return ctxerr.Wrap(ctx, err, "delete expired orphaned distributed_query_campaign_targets")
}

return nil
})
if err != nil {
return ctxerr.Wrap(ctx, err, "delete expired queries in batch")
}

// Break the loop if no rows were deleted in the current batch.
if rowsAffected == 0 {
break
}
return nil
}); err != nil {
return ctxerr.Wrap(ctx, err, "delete expired distributed queries")
}

return nil
}
4 changes: 2 additions & 2 deletions server/datastore/mysql/activities_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -991,7 +991,7 @@ func testCleanupActivitiesAndAssociatedDataBatch(t *testing.T, ds *Datastore) {
ExecAdhocSQL(t, ds, func(q sqlx.ExtContext) error {
return sqlx.GetContext(ctx, q, &queriesLen, `SELECT COUNT(*) FROM queries WHERE NOT saved;`)
})
require.Equal(t, 1000, queriesLen)
require.Equal(t, 250, queriesLen) // All expired queries should be cleaned up.

err = ds.CleanupActivitiesAndAssociatedData(ctx, maxCount, 1)
require.NoError(t, err)
Expand All @@ -1002,7 +1002,7 @@ func testCleanupActivitiesAndAssociatedDataBatch(t *testing.T, ds *Datastore) {
ExecAdhocSQL(t, ds, func(q sqlx.ExtContext) error {
return sqlx.GetContext(ctx, q, &queriesLen, `SELECT COUNT(*) FROM queries WHERE NOT saved;`)
})
require.Equal(t, 500, queriesLen)
require.Equal(t, 250, queriesLen)

err = ds.CleanupActivitiesAndAssociatedData(ctx, maxCount, 1)
require.NoError(t, err)
Expand Down
Loading